hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r511993 - in /lucene/hadoop/trunk: CHANGES.txt conf/hadoop-default.xml src/java/org/apache/hadoop/fs/InMemoryFileSystem.java src/java/org/apache/hadoop/io/SequenceFile.java src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
Date Mon, 26 Feb 2007 20:35:58 GMT
Author: cutting
Date: Mon Feb 26 12:35:57 2007
New Revision: 511993

URL: http://svn.apache.org/viewvc?view=rev&rev=511993
Log:
HADOOP-1027.  Fix problems with in-memory merging during shuffle and re-enable this optimization.
 Contributed by Devaraj.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=511993&r1=511992&r2=511993
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Feb 26 12:35:57 2007
@@ -141,6 +141,9 @@
 41. HADOOP-1040.  Update RandomWriter example to use counters and
     user-defined input and output formats.  (omalley via cutting)
 
+42. HADOOP-1027.  Fix problems with in-memory merging during shuffle
+    and re-enable this optimization.  (Devaraj Das via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=511993&r1=511992&r2=511993
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Mon Feb 26 12:35:57 2007
@@ -132,7 +132,7 @@
 
 <property>
   <name>fs.inmemory.size.mb</name>
-  <value>0</value>
+  <value>75</value>
   <description>The size of the in-memory filsystem instance in MB</description>
 </property>
 
@@ -556,6 +556,17 @@
   /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
 
         -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
+  </description>
+</property>
+
+<property>
+  <name>mapred.inmem.merge.threshold</name>
+  <value>1000</value>
+  <description>The threshold, in terms of the number of files 
+  for the in-memory merge process. When we accumulate threshold number of files
+  we initiate the in-memory merge and spill to disk. A value of 0 or less than
+  0 indicates we want to DON'T have any threshold and instead depend only on
+  the ramfs's memory consumption to trigger the merge.
   </description>
 </property>
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java?view=diff&rev=511993&r1=511992&r2=511993
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Mon Feb 26 12:35:57
2007
@@ -98,10 +98,13 @@
     private FileAttributes fAttr;
     
     public InMemoryInputStream(Path f) throws IOException {
-      fAttr = pathToFileAttribs.get(getPath(f));
-      if (fAttr == null) throw new FileNotFoundException("File " + f + 
-                                                         " does not exist");
-      din.reset(fAttr.data, 0, fAttr.size);
+      synchronized (InMemoryFileSystem.this) {
+        fAttr = pathToFileAttribs.get(getPath(f));
+        if (fAttr == null) { 
+          throw new FileNotFoundException("File " + f + " does not exist");
+        }                            
+        din.reset(fAttr.data, 0, fAttr.size);
+      }
     }
     
     public long getPos() throws IOException {
@@ -214,12 +217,16 @@
 
   public void close() throws IOException {
     super.close();
-    if (pathToFileAttribs != null) 
-      pathToFileAttribs.clear();
-    pathToFileAttribs = null;
-    if (tempFileAttribs != null)
-      tempFileAttribs.clear();
-    tempFileAttribs = null;
+    synchronized (this) {
+      if (pathToFileAttribs != null) { 
+        pathToFileAttribs.clear();
+      }
+      pathToFileAttribs = null;
+      if (tempFileAttribs != null) {
+        tempFileAttribs.clear();
+      }
+      tempFileAttribs = null;
+    }
   }
 
   /**
@@ -236,6 +243,9 @@
 
   public boolean renameRaw(Path src, Path dst) throws IOException {
     synchronized (this) {
+      if (exists(dst)) {
+        throw new IOException ("Path " + dst + " already exists");
+      }
       FileAttributes fAttr = pathToFileAttribs.remove(getPath(src));
       if (fAttr == null) return false;
       pathToFileAttribs.put(getPath(dst), fAttr);
@@ -256,7 +266,9 @@
   }
 
   public boolean exists(Path f) throws IOException {
-    return pathToFileAttribs.containsKey(getPath(f));
+    synchronized (this) {
+      return pathToFileAttribs.containsKey(getPath(f));
+    }
   }
   
   /**
@@ -267,7 +279,9 @@
   }
 
   public long getLength(Path f) throws IOException {
-    return pathToFileAttribs.get(getPath(f)).size;
+    synchronized (this) {
+      return pathToFileAttribs.get(getPath(f)).size;
+    }
   }
   
   /**
@@ -363,13 +377,18 @@
   public Path[] getFiles(PathFilter filter) {
     synchronized (this) {
       List <String> closedFilesList = new ArrayList();
-      Set paths = pathToFileAttribs.keySet();
-      if (paths == null || paths.isEmpty()) return new Path[0];
-      Iterator iter = paths.iterator();
-      while (iter.hasNext()) {
-        String f = (String)iter.next();
-        if (filter.accept(new Path(f)))
-          closedFilesList.add(f);
+      synchronized (pathToFileAttribs) {
+        Set paths = pathToFileAttribs.keySet();
+        if (paths == null || paths.isEmpty()) {
+          return new Path[0];
+        }
+        Iterator iter = paths.iterator();
+        while (iter.hasNext()) {
+          String f = (String)iter.next();
+          if (filter.accept(new Path(f))) {
+            closedFilesList.add(f);
+          }
+        }
       }
       String [] names = 
         closedFilesList.toArray(new String[closedFilesList.size()]);
@@ -381,6 +400,10 @@
     }
   }
   
+  public int getNumFiles(PathFilter filter) {
+    return getFiles(filter).length;
+  }
+
   public int getFSSize() {
     return fsSize;
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=511993&r1=511992&r2=511993
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon Feb 26 12:35:57
2007
@@ -2381,14 +2381,37 @@
         do {
           //get the factor for this pass of merge
           factor = getPassFactor(passNo, numSegments);
-          //extract the smallest 'factor' number of segment pointers from the 
-          //TreeMap
-          SegmentDescriptor[] mStream = getSegmentDescriptors(factor);
-          
+          List <SegmentDescriptor> segmentsToMerge = new ArrayList();
+          int segmentsConsidered = 0;
+          int numSegmentsToConsider = factor;
+          while (true) {
+            //extract the smallest 'factor' number of segment pointers from the 
+            //TreeMap. Call cleanup on the empty segments (no key/value data)
+            SegmentDescriptor[] mStream = 
+              getSegmentDescriptors(numSegmentsToConsider);
+            for (int i = 0; i < mStream.length; i++) {
+              if (mStream[i].nextRawKey()) {
+                segmentsToMerge.add(mStream[i]);
+                segmentsConsidered++;
+              }
+              else {
+                mStream[i].cleanup();
+                numSegments--; //we ignore this segment for the merge
+              }
+            }
+            //if we have the desired number of segments
+            //or looked at all available segments, we break
+            if (segmentsConsidered == factor || 
+                sortedSegmentSizes.size() == 0) {
+              break;
+            }
+              
+            numSegmentsToConsider = factor - segmentsConsidered;
+          }
           //feed the streams to the priority queue
-          initialize(mStream.length); clear();
-          for (int i = 0; i < mStream.length; i++) {
-            if (mStream[i].nextRawKey()) put(mStream[i]);
+          initialize(segmentsToMerge.size()); clear();
+          for (int i = 0; i < segmentsToMerge.size(); i++) {
+            put(segmentsToMerge.get(i));
           }
           //if we have lesser number of segments remaining, then just return the
           //iterator, else do another single level merge
@@ -2396,10 +2419,13 @@
             //calculate the length of the remaining segments. Required for 
             //calculating the merge progress
             long totalBytes = 0;
-            for (int i = 0; i < numSegments; i++)
-              totalBytes += mStream[i].segmentLength;
+            for (int i = 0; i < segmentsToMerge.size(); i++) {
+              totalBytes += segmentsToMerge.get(i).segmentLength;
+            }
             if (totalBytes != 0) //being paranoid
               progPerByte = 1.0f / (float)totalBytes;
+            //reset factor to what it originally was
+            factor = origFactor;
             return this;
           } else {
             //we want to spread the creation of temp files on multiple disks if 
@@ -2410,8 +2436,8 @@
                                                 tmpFilename.toString());
             LOG.info("writing intermediate results to " + outputFile);
             Writer writer = cloneFileAttributes(
-                            fs.makeQualified(mStream[0].segmentPathName), 
-                            fs.makeQualified(outputFile), null);
+                      fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
+                      fs.makeQualified(outputFile), null);
             writer.sync = null; //disable sync for temp files
             writeFile(this, writer);
             writer.close();
@@ -2420,17 +2446,6 @@
             //queue
             this.close();
             
-            //this is required to handle the corner case where we have empty
-            //map outputs to merge. The empty map outputs will just have the 
-            //sequence file header; they won't be inserted in the priority 
-            //queue. Thus, they won't be deleted in the regular process where 
-            //cleanup happens when a stream is popped off (when the key/value
-            //from that stream has been iterated over) from the queue.
-            for (int i = 0; i < mStream.length; i++) {
-              if (mStream[i].in != null) //true if cleanup didn't happen
-                mStream[i].cleanup();
-            }
-
             SegmentDescriptor tempSegment = 
                  new SegmentDescriptor(0, fs.getLength(outputFile), outputFile);
             //put the segment back in the TreeMap
@@ -2528,8 +2543,12 @@
        */
       public boolean nextRawKey() throws IOException {
         if (in == null) {
+        int bufferSize = conf.getInt("io.file.buffer.size", 4096); 
+        if (fs.getUri().getScheme().startsWith("ramfs")) {
+          bufferSize = conf.getInt("io.bytes.per.checksum", 512);
+        }
         Reader reader = new Reader(fs, segmentPathName, 
-            conf.getInt("io.file.buffer.size", 4096), segmentOffset, 
+            bufferSize, segmentOffset, 
                 segmentLength, conf);
         
         //sometimes we ignore syncs especially for temp merge files

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=511993&r1=511992&r2=511993
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Mon Feb 26
12:35:57 2007
@@ -115,6 +115,11 @@
   private volatile boolean mergeInProgress = false;
 
   /**
+   * When we accumulate merge_threshold number of files in ram, we merge/spill
+   */
+  private int mergeThreshold = 500;
+  
+  /**
    * The threads for fetching the files.
    */
   private MapOutputCopier[] copiers = null;
@@ -316,8 +321,11 @@
                  " output from " + loc.getHost() + ".");
         //Create a thread to do merges. Synchronize access/update to 
         //mergeInProgress
-        if (!mergeInProgress && inMemFileSys.getPercentUsed() >= 
-                                                       MAX_INMEM_FILESYS_USE) {
+        if (!mergeInProgress && 
+            (inMemFileSys.getPercentUsed() >= MAX_INMEM_FILESYS_USE || 
+             (mergeThreshold > 0 && 
+              inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >= mergeThreshold))&&
+            mergeThrowable == null) {
           LOG.info(reduceId + " InMemoryFileSystem " + 
                    inMemFileSys.getUri().toString() +
                    " is " + inMemFileSys.getPercentUsed() + 
@@ -383,6 +391,7 @@
     this.copyResults = new ArrayList(100);    
     this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
     this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
+    this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
 
     //we want to distinguish inmem fs instances for different reduces. Hence,
     //append a unique string in the uri for the inmem fs name
@@ -619,20 +628,6 @@
       }
     }
     
-    if (mergeThrowable != null) {
-      //set the task state to FAILED
-      TaskTracker tracker = ReduceTaskRunner.this.getTracker();
-      TaskTracker.TaskInProgress tip = 
-        tracker.runningTasks.get(reduceTask.getTaskId());
-      tip.runstate = TaskStatus.State.FAILED;
-      try {
-        tip.cleanup();
-      } catch (Throwable ie2) {
-        // Ignore it, we are just trying to cleanup.
-      }
-      inMemFileSys.close();
-    }
-    
     //Do a merge of in-memory files (if there are any)
     if (!killed && mergeThrowable == null) {
       try {
@@ -644,6 +639,11 @@
                  " Copying of all map outputs complete. " + 
                  "Initiating the last merge on the remaining files in " + 
                  inMemFileSys.getUri());
+        if (mergeThrowable != null) {
+          //this could happen if the merge that
+          //was in progress threw an exception
+          throw mergeThrowable;
+        }
         //initiate merge
         Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
         if (inMemClosedFiles.length == 0) {
@@ -651,16 +651,28 @@
               inMemFileSys.getUri());
           return numCopied == numOutputs;
         }
-        RawKeyValueIterator rIter = 
-          sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, 
-                       new Path(reduceTask.getTaskId()));
         //name this output file same as the name of the first file that is 
         //there in the current list of inmem files (this is guaranteed to be
         //absent on the disk currently. So we don't overwrite a prev. 
-        //created spill)
+        //created spill). Also we need to create the output file now since
+        //it is not guaranteed that this file will be present after merge
+        //is called (we delete empty sequence files as soon as we see them
+        //in the merge method)
         SequenceFile.Writer writer = sorter.cloneFileAttributes(
             inMemFileSys.makeQualified(inMemClosedFiles[0]), 
             localFileSys.makeQualified(inMemClosedFiles[0]), null);
+        
+        RawKeyValueIterator rIter = null;
+        try {
+          rIter = sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, 
+                       new Path(reduceTask.getTaskId()));
+        } catch (Exception e) { 
+          //make sure that we delete the ondisk file that we created earlier
+          //when we invoked cloneFileAttributes
+          writer.close();
+          localFileSys.delete(inMemClosedFiles[0]);
+          throw new IOException (StringUtils.stringifyException(e));
+        }
         sorter.writeFile(rIter, writer);
         writer.close();
         LOG.info(reduceTask.getTaskId() +
@@ -668,14 +680,15 @@
                  " files in InMemoryFileSystem complete." +
                  " Local file is " + inMemClosedFiles[0]);
       } catch (Throwable t) {
-        LOG.warn("Merge of the inmemory files threw an exception: " + 
+        LOG.warn(reduceTask.getTaskId() +
+            " Final merge of the inmemory files threw an exception: " + 
             StringUtils.stringifyException(t));
-        inMemFileSys.close();
         return false;
       }
     }
     return mergeThrowable == null && numCopied == numOutputs && !killed;
     } finally {
+      inMemFileSys.close();
       pingTimer.interrupt();
     }
   }
@@ -780,15 +793,27 @@
         //output files to merge to get the benefit of in-memory merge
         if (inMemClosedFiles.length >= 
           (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
-          RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true, 
-              inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
           //name this output file same as the name of the first file that is 
           //there in the current list of inmem files (this is guaranteed to be
           //absent on the disk currently. So we don't overwrite a prev. 
-          //created spill)
+          //created spill). Also we need to create the output file now since
+          //it is not guaranteed that this file will be present after merge
+          //is called (we delete empty sequence files as soon as we see them
+          //in the merge method)
           SequenceFile.Writer writer = sorter.cloneFileAttributes(
               inMemFileSys.makeQualified(inMemClosedFiles[0]), 
               localFileSys.makeQualified(inMemClosedFiles[0]), null);
+          RawKeyValueIterator rIter;
+          try {
+            rIter = sorter.merge(inMemClosedFiles, true, 
+              inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
+          } catch (Exception e) { 
+            //make sure that we delete the ondisk file that we created earlier
+            //when we invoked cloneFileAttributes
+            writer.close();
+            localFileSys.delete(inMemClosedFiles[0]);
+            throw new IOException (StringUtils.stringifyException(e));
+          }
           sorter.writeFile(rIter, writer);
           writer.close();
           LOG.info(reduceTask.getTaskId() + 
@@ -801,7 +826,8 @@
               inMemFileSys.getUri());
         }
       } catch (Throwable t) {
-        LOG.warn("Merge of the inmemory files threw an exception: " + 
+        LOG.warn(reduceTask.getTaskId() +
+            " Intermediate Merge of the inmemory files threw an exception: " + 
             StringUtils.stringifyException(t));
         ReduceTaskRunner.this.mergeThrowable = t;
       }



Mime
View raw message