hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r662634 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java
Date Tue, 03 Jun 2008 04:19:26 GMT
Author: cdouglas
Date: Mon Jun  2 21:19:25 2008
New Revision: 662634

URL: http://svn.apache.org/viewvc?rev=662634&view=rev
Log:
HADOOP-3443. Avoid copying map output across partitions when renaming a
single spill. Contributed by Owen O'Malley.


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=662634&r1=662633&r2=662634&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jun  2 21:19:25 2008
@@ -391,6 +391,9 @@
     HADOOP-3475. Fix MapTask to correctly size the accounting allocation of
     io.sort.mb. (cdouglas)
 
+    HADOOP-3443. Avoid copying map output across partitions when renaming a
+    single spill. (omalley via cdouglas)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=662634&r1=662633&r2=662634&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Jun  2 21:19:25 2008
@@ -35,7 +35,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,7 +57,6 @@
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.QuickSort;
@@ -174,7 +172,7 @@
     public float getProgress() throws IOException {
       return rawIn.getProgress();
     }
-  };
+  }
 
   @Override
   @SuppressWarnings("unchecked")
@@ -366,6 +364,7 @@
       if ((sortmb & 0x7FF) != sortmb) {
         throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
       }
+      LOG.info("io.sort.mb = " + sortmb);
       // buffers and accounting
       int maxMemUsage = sortmb << 20;
       int recordCapacity = (int)(maxMemUsage * recper);
@@ -377,6 +376,8 @@
       kvindices = new int[recordCapacity * ACCTSIZE];
       softBufferLimit = (int)(kvbuffer.length * spillper);
       softRecordLimit = (int)(kvoffsets.length * spillper);
+      LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
+      LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
       // k/v serialization
       comparator = job.getOutputKeyComparator();
       keyClass = job.getMapOutputKeyClass();
@@ -643,6 +644,12 @@
                   ? bufindex - bufend > softBufferLimit
                   : bufend - bufindex < bufvoid - softBufferLimit;
                 if (kvsoftlimit || bufsoftlimit || (buffull && !wrap)) {
+                  LOG.info("Spilling map output: buffer full = " + bufsoftlimit+
+                           " and record full = " + kvsoftlimit);
+                  LOG.info("bufindex = " + bufindex + "; bufend = " + bufend +
+                           "; bufvoid = " + bufvoid);
+                  LOG.info("kvindex = " + kvindex + "; kvend = " + kvend +
+                           "; length = " + kvoffsets.length);
                   kvend = kvindex;
                   bufend = bufmark;
                   // TODO No need to recreate this thread every time
@@ -693,6 +700,7 @@
     }
 
     public synchronized void flush() throws IOException {
+      LOG.info("Starting flush of map output");
       synchronized (spillLock) {
         while (kvstart != kvend) {
           try {
@@ -815,6 +823,7 @@
           }
         }
         ++numSpills;
+        LOG.info("Finished spill " + numSpills);
       } finally {
         if (out != null) out.close();
         if (indexOut != null) indexOut.close();
@@ -976,7 +985,15 @@
       for(int i = 0; i < numSpills; i++) {
         filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
         indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i);
-        finalOutFileSize += localFs.getLength(filename[i]);
+        finalOutFileSize += localFs.getFileStatus(filename[i]).getLen();
+      }
+      
+      if (numSpills == 1) { //the spill is the final output
+        localFs.rename(filename[0], 
+                       new Path(filename[0].getParent(), "file.out"));
+        localFs.rename(indexFileName[0], 
+                       new Path(indexFileName[0].getParent(),"file.out.index"));
+        return;
       }
       //make correction in the length to include the sequence file header
       //lengths for each partition
@@ -989,12 +1006,6 @@
       Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
                             getTaskID(), finalIndexFileSize);
       
-      if (numSpills == 1) { //the spill is the final output
-        localFs.rename(filename[0], finalOutputFile);
-        localFs.rename(indexFileName[0], finalIndexFile);
-        return;
-      }
-      
       //The output stream for the final single output file
       FSDataOutputStream finalOut = localFs.create(finalOutputFile, true, 
                                                    4096);



Mime
View raw message