hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r667041 [2/2] - in /hadoop/core/branches/branch-0.18: ./ docs/ src/core/org/apache/hadoop/io/compress/ src/docs/src/documentation/content/xdocs/ src/mapred/org/apache/hadoop/mapred/
Date Thu, 12 Jun 2008 10:47:53 GMT
Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=667041&r1=667040&r2=667041&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu
Jun 12 03:47:52 2008
@@ -422,11 +422,6 @@
      */
     private volatile boolean exitLocalFSMerge = false;
     
-    /** 
-     * A flag to indicate when to exit InMemMerge
-     */
-    private volatile boolean exitInMemMerge = false;
-    
     /**
      * When we accumulate mergeThreshold number of files in ram, we merge/spill
      */
@@ -712,42 +707,60 @@
     class ShuffleRamManager implements RamManager {
       /* Maximum percentage of the in-memory limit that a single shuffle can 
        * consume*/ 
-      private static final float MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT = 0.25f;
+      private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;
       
-      private boolean closed = false;
+      /* Maximum percentage of shuffle-threads which can be stalled 
+       * simultaneously after which a merge is triggered. */ 
+      private static final float MAX_STALLED_SHUFFLE_THREADS_FRACTION = 0.75f;
       
-      volatile private int numClosed = 0;
-      volatile private int size = 0;
       private final int maxSize;
       private final int maxSingleShuffleLimit;
       
+      private int size = 0;
+      
       private Object dataAvailable = new Object();
-      private volatile int fullSize = 0;
+      private int fullSize = 0;
+      private int numPendingRequests = 0;
+      private int numRequiredMapOutputs = 0;
+      private int numClosed = 0;
+      private boolean closed = false;
       
       public ShuffleRamManager(Configuration conf) {
         maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024;
-        maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT);
+        maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
         LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + 
                  ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
       }
       
-      public synchronized boolean reserve(int requestedSize, InputStream in) {
+      public synchronized boolean reserve(int requestedSize, InputStream in) 
+      throws InterruptedException {
+        // Wait till the request can be fulfilled...
         while ((size + requestedSize) > maxSize) {
-          try {
-            // Close the connection
-            if (in != null) {
-              try {
-                in.close();
-              } catch (IOException ie) {
-                LOG.info("Failed to close connection with: " + ie);
-              } finally {
-                in = null;
-              }
+          
+          // Close the input...
+          if (in != null) {
+            try {
+              in.close();
+            } catch (IOException ie) {
+              LOG.info("Failed to close connection with: " + ie);
+            } finally {
+              in = null;
             }
-            
-            // Wait for memory to free up
-            wait();
-          } catch (InterruptedException ie) {}
+          } 
+
+          // Track pending requests
+          synchronized (dataAvailable) {
+            ++numPendingRequests;
+            dataAvailable.notify();
+          }
+
+          // Wait for memory to free up
+          wait();
+          
+          // Track pending requests
+          synchronized (dataAvailable) {
+            --numPendingRequests;
+          }
         }
         
         size += requestedSize;
@@ -767,20 +780,25 @@
         notifyAll();
       }
       
-      public void waitForDataToMerge() {
+      public boolean waitForDataToMerge() throws InterruptedException {
+        boolean done = false;
         synchronized (dataAvailable) {
           while (!closed &&
                  (getPercentUsed() < MAX_INMEM_FILESYS_USE ||
-                  getReservedFiles() < 
+                  numClosed < 
                     (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)
                  ) 
                  &&
-                 (mergeThreshold <= 0 || getReservedFiles() < mergeThreshold)) {
-            try {
-              dataAvailable.wait();
-            } catch (InterruptedException ie) {}
+                 (mergeThreshold <= 0 || numClosed < mergeThreshold) 
+                 && 
+                 (numPendingRequests < 
+                      numCopiers*MAX_STALLED_SHUFFLE_THREADS_FRACTION && 
+                   numPendingRequests < numRequiredMapOutputs)) {
+            dataAvailable.wait();
           }
+          done = closed;
         }
+        return done;
       }
       
       public void closeInMemoryFile(int requestedSize) {
@@ -791,6 +809,13 @@
         }
       }
       
+      public void setNumCopiedMapOutputs(int numRequiredMapOutputs) {
+        synchronized (dataAvailable) {
+          this.numRequiredMapOutputs = numRequiredMapOutputs;
+          dataAvailable.notify();
+        }
+      }
+      
       public void close() {
         synchronized (dataAvailable) {
           closed = true;
@@ -799,14 +824,10 @@
         }
       }
       
-      float getPercentUsed() {
+      private float getPercentUsed() {
         return (float)fullSize/maxSize;
       }
-      
-      int getReservedFiles() {
-        return numClosed;
-      }
-      
+
       int getMemoryLimit() {
         return maxSize;
       }
@@ -978,7 +999,8 @@
             }
             
             // Note that we successfully copied the map-output
-            copiedMapOutputs.add(loc.getTaskId());
+            noteCopiedMapOutput(loc.getTaskId());
+            
             return bytes;
           }
           
@@ -1004,12 +1026,22 @@
           }
 
           // Note that we successfully copied the map-output
-          copiedMapOutputs.add(loc.getTaskId());
+          noteCopiedMapOutput(loc.getTaskId());
         }
         
         return bytes;
       }
       
+      /**
+       * Save the map taskid whose output we just copied.
+       * This function assumes that it has been synchronized on ReduceTask.this.
+       * 
+       * @param taskId map taskid
+       */
+      private void noteCopiedMapOutput(TaskID taskId) {
+        copiedMapOutputs.add(taskId);
+        ramManager.setNumCopiedMapOutputs(numMaps - copiedMapOutputs.size());
+      }
 
       /**
        * Get the map output into a local file (either in the inmemory fs or on the 
@@ -1248,6 +1280,7 @@
       this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
       this.umbilical = umbilical;      
       this.reduceTask = ReduceTask.this;
+
       this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
       this.copyResults = new ArrayList<CopyResult>(100);    
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
@@ -1304,7 +1337,6 @@
     
     @SuppressWarnings("unchecked")
     public boolean fetchOutputs() throws IOException {
-      final int      numOutputs = reduceTask.getNumMaps();
       List<MapOutputLocation> knownOutputs = 
         new ArrayList<MapOutputLocation>(numCopiers);
       int totalFailures = 0;
@@ -1316,7 +1348,7 @@
       LocalFSMerger localFSMergerThread = null;
       InMemFSMergeThread inMemFSMergeThread = null;
       
-      for (int i = 0; i < numOutputs; i++) {
+      for (int i = 0; i < numMaps; i++) {
         copyPhase.addPhase();       // add sub-phase per file
       }
       
@@ -1346,7 +1378,7 @@
       IntWritable fromEventId = new IntWritable(0);
       
         // loop until we get all required outputs
-        while (copiedMapOutputs.size() < numOutputs && mergeThrowable == null)
{
+        while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
           
           currentTime = System.currentTimeMillis();
           boolean logNow = false;
@@ -1356,7 +1388,7 @@
           }
           if (logNow) {
             LOG.info(reduceTask.getTaskID() + " Need another " 
-                   + (numOutputs - copiedMapOutputs.size()) + " map output(s) "
+                   + (numMaps - copiedMapOutputs.size()) + " map output(s) "
                    + "where " + numInFlight + " is already in progress");
           }
           
@@ -1503,7 +1535,7 @@
               float transferRate = mbs/secsSinceStart;
                 
               copyPhase.startNextPhase();
-              copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs 
+              copyPhase.setStatus("copy (" + numCopied + " of " + numMaps 
                                   + " at " +
                                   mbpsFormat.format(transferRate) +  " MB/s)");
                 
@@ -1640,7 +1672,6 @@
           mapOutputFilesOnDisk.notify();
         }
         
-        exitInMemMerge = true;
         ramManager.close();
         
         //Do a merge of in-memory files (if there are any)
@@ -1648,9 +1679,13 @@
           try {
             // Wait for the on-disk merge to complete
             localFSMergerThread.join();
+            LOG.info("Interleaved on-disk merge complete: " + 
+                     mapOutputFilesOnDisk.size() + " files left.");
             
             //wait for an ongoing merge (if it is in flight) to complete
             inMemFSMergeThread.join();
+            LOG.info("In-memory merge complete: " + 
+                     mapOutputsFilesInMemory.size() + " files left.");
             } catch (Throwable t) {
             LOG.warn(reduceTask.getTaskID() +
                      " Final merge of the inmemory files threw an exception: " + 
@@ -1662,7 +1697,7 @@
             return false;
           }
         }
-        return mergeThrowable == null && copiedMapOutputs.size() == numOutputs;
+        return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
     }
     
     private List<Segment<K, V>> createInMemorySegments() {
@@ -1908,10 +1943,11 @@
       public void run() {
         LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
         try {
-          while (!exitInMemMerge) {
-            ramManager.waitForDataToMerge();
+          boolean exit = false;
+          do {
+            exit = ramManager.waitForDataToMerge();
             doInMemMerge();
-          }
+          } while (!exit);
         } catch (Throwable t) {
           LOG.warn(reduceTask.getTaskID() +
                    " Merge of the inmemory files threw an exception: "
@@ -1923,7 +1959,6 @@
       @SuppressWarnings("unchecked")
       private void doInMemMerge() throws IOException{
         if (mapOutputsFilesInMemory.size() == 0) {
-          LOG.info("Noting to merge... ");
           return;
         }
         
@@ -1953,12 +1988,16 @@
         RawKeyValueIterator rIter = null;
         final Reporter reporter = getReporter(umbilical);
         try {
+          LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
+                   " segments...");
+          
           rIter = Merger.merge(conf, localFileSys,
                                (Class<K>)conf.getMapOutputKeyClass(),
                                (Class<V>)conf.getMapOutputValueClass(),
                                inMemorySegments, inMemorySegments.size(),
                                new Path(reduceTask.getTaskID().toString()),
                                conf.getOutputKeyComparator(), reporter);
+          
           if (null == combinerClass) {
             Merger.writeFile(rIter, writer, reporter);
           } else {
@@ -1966,6 +2005,12 @@
             combineAndSpill(rIter, reduceCombineInputCounter);
           }
           writer.close();
+
+          LOG.info(reduceTask.getTaskID() + 
+              " Merge of the " + noInMemorySegments +
+              " files in-memory complete." +
+              " Local file is " + outputPath + " of size " + 
+              localFileSys.getFileStatus(outputPath).getLen());
         } catch (Exception e) { 
           //make sure that we delete the ondisk file that we created 
           //earlier when we invoked cloneFileAttributes
@@ -1973,12 +2018,8 @@
           throw (IOException)new IOException
                   ("Intermedate merge failed").initCause(e);
         }
-        LOG.info(reduceTask.getTaskID() + 
-                 " Merge of the " + noInMemorySegments +
-                 " files in-memory complete." +
-                 " Local file is " + outputPath + " of size " + 
-                 localFileSys.getFileStatus(outputPath).getLen());
-        
+
+        // Note the output of the merge
         FileStatus status = localFileSys.getFileStatus(outputPath);
         synchronized (mapOutputFilesOnDisk) {
           addToMapOutputFilesOnDisk(status);



Mime
View raw message