hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r810944 [3/4] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/task/ src/java/org/apache/hadoop/mapreduce/task/reduce/...
Date Thu, 03 Sep 2009 13:56:22 GMT
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Thu Sep  3 13:56:21 2009
@@ -105,7 +105,7 @@
   }
   
   @Override
-  void addFetchFailedMap(TaskAttemptID mapTaskId) {
+  public void addFetchFailedMap(TaskAttemptID mapTaskId) {
     failedFetchTasks.add(mapTaskId);
   }
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Thu Sep  3 13:56:21 2009
@@ -61,6 +61,30 @@
   private static final Log LOG =
     LogFactory.getLog(Task.class);
 
+  // Counters used by Task subclasses
+  protected static enum Counter { 
+    MAP_INPUT_RECORDS, 
+    MAP_OUTPUT_RECORDS,
+    MAP_SKIPPED_RECORDS,
+    MAP_INPUT_BYTES, 
+    MAP_OUTPUT_BYTES,
+    COMBINE_INPUT_RECORDS,
+    COMBINE_OUTPUT_RECORDS,
+    REDUCE_INPUT_GROUPS,
+    REDUCE_SHUFFLE_BYTES,
+    REDUCE_INPUT_RECORDS,
+    REDUCE_OUTPUT_RECORDS,
+    REDUCE_SKIPPED_GROUPS,
+    REDUCE_SKIPPED_RECORDS,
+    SPILLED_RECORDS,
+    FAILED_SHUFFLE,
+    SHUFFLED_MAPS,
+    MERGED_MAP_OUTPUTS,
+  }
+  
+  public static String MERGED_OUTPUT_PREFIX = ".merged";
+  
+
   /**
    * Counters to measure the usage of the different file systems.
    * Always return the String array with two elements. First one is the name of  
@@ -123,6 +147,8 @@
   protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
   protected org.apache.hadoop.mapreduce.OutputCommitter committer;
   protected final Counters.Counter spilledRecordsCounter;
+  protected final Counters.Counter failedShuffleCounter;
+  protected final Counters.Counter mergedMapOutputsCounter;
   private int numSlotsRequired;
   protected TaskUmbilicalProtocol umbilical;
 
@@ -134,6 +160,8 @@
     taskStatus = TaskStatus.createTaskStatus(isMapTask());
     taskId = new TaskAttemptID();
     spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
+    failedShuffleCounter = counters.findCounter(Counter.FAILED_SHUFFLE);
+    mergedMapOutputsCounter = counters.findCounter(Counter.MERGED_MAP_OUTPUTS);
   }
 
   public Task(String jobFile, TaskAttemptID taskId, int partition, 
@@ -152,6 +180,8 @@
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
     spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
+    failedShuffleCounter = counters.findCounter(Counter.FAILED_SHUFFLE);
+    mergedMapOutputsCounter = counters.findCounter(Counter.MERGED_MAP_OUTPUTS);
   }
 
   ////////////////////////////////////////////
@@ -693,7 +723,12 @@
     sendDone(umbilical);
   }
 
-  protected void statusUpdate(TaskUmbilicalProtocol umbilical) 
+  /**
+   * Send a status update to the task tracker
+   * @param umbilical
+   * @throws IOException
+   */
+  public void statusUpdate(TaskUmbilicalProtocol umbilical) 
   throws IOException {
     int retries = MAX_RETRIES;
     while (true) {
@@ -860,7 +895,7 @@
   /**
    * OutputCollector for the combiner.
    */
-  protected static class CombineOutputCollector<K extends Object, V extends Object> 
+  public static class CombineOutputCollector<K extends Object, V extends Object> 
   implements OutputCollector<K, V> {
     private Writer<K, V> writer;
     private Counters.Counter outCounter;
@@ -938,7 +973,7 @@
     /// Auxiliary methods
 
     /** Start processing next unique key. */
-    void nextKey() throws IOException {
+    public void nextKey() throws IOException {
       // read until we find a new key
       while (hasNext) { 
         readNextKey();
@@ -953,12 +988,12 @@
     }
 
     /** True iff more keys remain. */
-    boolean more() { 
+    public boolean more() { 
       return more; 
     }
 
     /** The current key. */
-    KEY getKey() { 
+    public KEY getKey() { 
       return key; 
     }
 
@@ -988,7 +1023,8 @@
     }
   }
 
-  protected static class CombineValuesIterator<KEY,VALUE>
+    /** Iterator to return Combined values */
+  public static class CombineValuesIterator<KEY,VALUE>
       extends ValuesIterator<KEY,VALUE> {
 
     private final Counters.Counter combineInputCounter;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Thu Sep  3 13:56:21 2009
@@ -219,7 +219,7 @@
    * Set current phase of this task.  
    * @param phase phase of this task
    */
-  void setPhase(Phase phase){
+  public void setPhase(Phase phase){
     TaskStatus.Phase oldPhase = getPhase();
     if (oldPhase != phase){
       // sort phase started
@@ -294,7 +294,7 @@
    *  
    * @param mapTaskId map from which fetch failed
    */
-  synchronized void addFetchFailedMap(TaskAttemptID mapTaskId) {}
+  public abstract void addFetchFailedMap(TaskAttemptID mapTaskId);
 
   /**
    * Update the status of the task.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Sep  3 13:56:21 2009
@@ -17,10 +17,10 @@
  */
  package org.apache.hadoop.mapred;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -36,6 +36,7 @@
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.StringTokenizer;
 import java.util.TreeMap;
 import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
@@ -70,6 +71,7 @@
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -126,13 +128,13 @@
     LogFactory.getLog(TaskTracker.class);
 
   public static final String MR_CLIENTTRACE_FORMAT =
-        "src: %s" +     // src IP
-        ", dest: %s" +  // dst IP
-        ", bytes: %s" + // byte count
-        ", op: %s" +    // operation
-        ", cliID: %s" + // task id
-        ", reduceID: %s" + // reduce id
-        ", duration: %s"; // duration
+    "src: %s" +     // src IP
+    ", dest: %s" +  // dst IP
+    ", maps: %s" + // number of maps
+    ", op: %s" +    // operation
+    ", reduceID: %s" + // reduce id
+    ", duration: %s"; // duration
+
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
@@ -147,7 +149,7 @@
 
   Server taskReportServer = null;
   InterTrackerProtocol jobClient;
-  
+
   private TrackerDistributedCacheManager distributedCacheManager;
     
   // last heartbeat response recieved
@@ -402,7 +404,7 @@
     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
   }
 
-  static String getJobCacheSubdir() {
+  public static String getJobCacheSubdir() {
     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
   }
 
@@ -3144,145 +3146,183 @@
     public void doGet(HttpServletRequest request, 
                       HttpServletResponse response
                       ) throws ServletException, IOException {
-      TaskAttemptID reduceAttId = null;
-      String mapId = request.getParameter("map");
+      long start = System.currentTimeMillis();
+      String mapIds = request.getParameter("map");
       String reduceId = request.getParameter("reduce");
       String jobId = request.getParameter("job");
 
+      LOG.debug("Shuffle started for maps (mapIds=" + mapIds + ") to reduce " + 
+               reduceId);
+
       if (jobId == null) {
         throw new IOException("job parameter is required");
       }
 
-      if (mapId == null || reduceId == null) {
+      if (mapIds == null || reduceId == null) {
         throw new IOException("map and reduce parameters are required");
       }
-      try {
-        reduceAttId = TaskAttemptID.forName(reduceId);
-      } catch (IllegalArgumentException e) {
-        throw new IOException("reduce attempt ID is malformed");
-      }
+      
       ServletContext context = getServletContext();
-      int reduce = reduceAttId.getTaskID().getId();
-      byte[] buffer = new byte[MAX_BYTES_TO_READ];
-      // true iff IOException was caused by attempt to access input
-      boolean isInputException = true;
-      OutputStream outStream = null;
-      FSDataInputStream mapOutputIn = null;
+      int reduce = Integer.parseInt(reduceId);
+      DataOutputStream outStream = null;
  
-      long totalRead = 0;
       ShuffleServerMetrics shuffleMetrics =
         (ShuffleServerMetrics) context.getAttribute("shuffleServerMetrics");
       TaskTracker tracker = 
         (TaskTracker) context.getAttribute("task.tracker");
 
-      long startTime = 0;
+      int numMaps = 0;
       try {
         shuffleMetrics.serverHandlerBusy();
-        if(ClientTraceLog.isInfoEnabled())
-          startTime = System.nanoTime();
-        outStream = response.getOutputStream();
+        outStream = new DataOutputStream(response.getOutputStream());
+        //use the same buffersize as used for reading the data from disk
+        response.setBufferSize(MAX_BYTES_TO_READ);
         JobConf conf = (JobConf) context.getAttribute("conf");
         LocalDirAllocator lDirAlloc = 
           (LocalDirAllocator)context.getAttribute("localDirAllocator");
         FileSystem rfs = ((LocalFileSystem)
             context.getAttribute("local.file.system")).getRaw();
 
-        // Index file
-        Path indexFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getIntermediateOutputDir(jobId, mapId)
-            + "/file.out.index", conf);
-        
-        // Map-output file
-        Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getIntermediateOutputDir(jobId, mapId)
-            + "/file.out", conf);
+        // Split the map ids, send output for one map at a time
+        StringTokenizer itr = new StringTokenizer(mapIds, ",");
+        while(itr.hasMoreTokens()) {
+          String mapId = itr.nextToken();
+          ++numMaps;
+          sendMapFile(jobId, mapId, reduce, conf, outStream,
+                      tracker, lDirAlloc, shuffleMetrics, rfs);
+        }
+      } catch (IOException ie) {
+        Log log = (Log) context.getAttribute("log");
+        String errorMsg = ("getMapOutputs(" + mapIds + "," + reduceId + 
+                           ") failed");
+        log.warn(errorMsg, ie);
+        response.sendError(HttpServletResponse.SC_GONE, errorMsg);
+        shuffleMetrics.failedOutput();
+        throw ie;
+      } finally {
+        shuffleMetrics.serverHandlerFree();
+      }
+      outStream.close();
+      shuffleMetrics.successOutput();
+      long timeElapsed = (System.currentTimeMillis()-start);
+      LOG.info("Shuffled " + numMaps
+          + "maps (mapIds=" + mapIds + ") to reduce "
+          + reduceId + " in " + timeElapsed + "s");
+
+      if (ClientTraceLog.isInfoEnabled()) {
+        ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT,
+            request.getLocalAddr() + ":" + request.getLocalPort(),
+            request.getRemoteAddr() + ":" + request.getRemotePort(),
+            numMaps, "MAPRED_SHUFFLE", reduceId,
+            timeElapsed));
+      }
+    }
 
+    private void sendMapFile(String jobId, String mapId,
+                             int reduce,
+                             Configuration conf,
+                             DataOutputStream outStream,
+                             TaskTracker tracker,
+                             LocalDirAllocator lDirAlloc,
+                             ShuffleServerMetrics shuffleMetrics,
+                             FileSystem localfs
+                             ) throws IOException {
+      
+      LOG.debug("sendMapFile called for " + mapId + " to reduce " + reduce);
+      
+      // true iff IOException was caused by attempt to access input
+      boolean isInputException = false;
+      FSDataInputStream mapOutputIn = null;
+      byte[] buffer = new byte[MAX_BYTES_TO_READ];
+      long totalRead = 0;
+
+      // Index file
+      Path indexFileName = lDirAlloc.getLocalPathToRead(
+          TaskTracker.getIntermediateOutputDir(jobId, mapId)
+          + "/file.out.index", conf);
+      
+      // Map-output file
+      Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
+          TaskTracker.getIntermediateOutputDir(jobId, mapId)
+          + "/file.out", conf);
+
+      /**
+       * Read the index file to get the information about where
+       * the map-output for the given reducer is available. 
+       */
+      IndexRecord info = 
+        tracker.indexCache.getIndexInformation(mapId, reduce, indexFileName);
+      
+      try {
         /**
-         * Read the index file to get the information about where
-         * the map-output for the given reducer is available. 
-         */
-        IndexRecord info = 
-          tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName);
-          
-        //set the custom "from-map-task" http header to the map task from which
-        //the map output data is being transferred
-        response.setHeader(FROM_MAP_TASK, mapId);
-        
-        //set the custom "Raw-Map-Output-Length" http header to 
-        //the raw (decompressed) length
-        response.setHeader(RAW_MAP_OUTPUT_LENGTH,
-            Long.toString(info.rawLength));
-
-        //set the custom "Map-Output-Length" http header to 
-        //the actual number of bytes being transferred
-        response.setHeader(MAP_OUTPUT_LENGTH,
-            Long.toString(info.partLength));
-
-        //set the custom "for-reduce-task" http header to the reduce task number
-        //for which this map output is being transferred
-        response.setHeader(FOR_REDUCE_TASK, Integer.toString(reduce));
-        
-        //use the same buffersize as used for reading the data from disk
-        response.setBufferSize(MAX_BYTES_TO_READ);
-        
-        /**
-         * Read the data from the sigle map-output file and
+         * Read the data from the single map-output file and
          * send it to the reducer.
          */
         //open the map-output file
-        mapOutputIn = rfs.open(mapOutputFileName);
-
+        mapOutputIn = localfs.open(mapOutputFileName);
         //seek to the correct offset for the reduce
         mapOutputIn.seek(info.startOffset);
+        
+        // write header for each map output
+        ShuffleHeader header = new ShuffleHeader(mapId, info.partLength,
+            info.rawLength, reduce);
+        header.write(outStream);
+
+        // read the map-output and stream it out
+        isInputException = true;
         long rem = info.partLength;
+        if (rem == 0) {
+          throw new IOException("Illegal partLength of 0 for mapId " + mapId + 
+                                " to reduce " + reduce);
+        }
         int len =
           mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
-        while (rem > 0 && len >= 0) {
+        long now = 0;
+        while (len >= 0) {
           rem -= len;
           try {
             shuffleMetrics.outputBytes(len);
-            outStream.write(buffer, 0, len);
-            outStream.flush();
+            
+            if (len > 0) {
+              outStream.write(buffer, 0, len);
+            } else {
+              LOG.info("Skipped zero-length read of map " + mapId + 
+                       " to reduce " + reduce);
+            }
+            
           } catch (IOException ie) {
             isInputException = false;
             throw ie;
           }
           totalRead += len;
+          if (rem == 0) {
+            break;
+          }
           len =
             mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
         }
-
-        LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
-                 " from map: " + mapId + " given " + info.partLength + "/" + 
-                 info.rawLength);
+        
+        mapOutputIn.close();
       } catch (IOException ie) {
-        Log log = (Log) context.getAttribute("log");
-        String errorMsg = ("getMapOutput(" + mapId + "," + reduceId + 
-                           ") failed :\n"+
-                           StringUtils.stringifyException(ie));
-        log.warn(errorMsg);
+        String errorMsg = "error on sending map " + mapId + " to reduce " + 
+                          reduce;
         if (isInputException) {
-          tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg);
-        }
-        response.sendError(HttpServletResponse.SC_GONE, errorMsg);
-        shuffleMetrics.failedOutput();
-        throw ie;
-      } finally {
-        if (null != mapOutputIn) {
-          mapOutputIn.close();
+          tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg + 
+                                StringUtils.stringifyException(ie));
         }
-        final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-        shuffleMetrics.serverHandlerFree();
-        if (ClientTraceLog.isInfoEnabled()) {
-          ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT,
-                request.getLocalAddr() + ":" + request.getLocalPort(),
-                request.getRemoteAddr() + ":" + request.getRemotePort(),
-                totalRead, "MAPRED_SHUFFLE", mapId, reduceId,
-                endTime-startTime));
+        if (mapOutputIn != null) {
+          try {
+            mapOutputIn.close();
+          } catch (IOException ioe) {
+            LOG.info("problem closing map output file", ioe);
+          }
         }
+        throw new IOException(errorMsg, ie);
       }
-      outStream.close();
-      shuffleMetrics.successOutput();
+      
+      LOG.info("Sent out " + totalRead + " bytes to reduce " + reduce + 
+          " from map: " + mapId + " given " + info.partLength + "/" + 
+          info.rawLength);
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Thu Sep  3 13:56:21 2009
@@ -27,7 +27,7 @@
  * parent is a daemon which which polls the central master for a new map or
  * reduce task and runs it as a child process.  All communication between child
  * and parent is via this protocol. */ 
-interface TaskUmbilicalProtocol extends VersionedProtocol {
+public interface TaskUmbilicalProtocol extends VersionedProtocol {
 
   /** 
    * Changed the version to 2, since we have a new method getMapOutputs 
@@ -142,7 +142,6 @@
    * task-tracker has changed or not. This will trigger some action at the 
    * child-process.
    *
-   * @param taskId the reduce task id
    * @param fromIndex the index starting from which the locations should be 
    * fetched
    * @param maxLocs the max number of locations to fetch

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.MapTaskCompletionEventsUpdate;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+class EventFetcher<K,V> extends Thread {
+  private static final long SLEEP_TIME = 1000;
+  private static final int MAX_EVENTS_TO_FETCH = 10000;
+  private static final int MAX_RETRIES = 10;
+  private static final int RETRY_PERIOD = 5000;
+  private static final Log LOG = LogFactory.getLog(EventFetcher.class);
+
+  private final TaskAttemptID reduce;
+  private final TaskUmbilicalProtocol umbilical;
+  private final ShuffleScheduler<K,V> scheduler;
+  private int fromEventId = 0;
+  private ExceptionReporter exceptionReporter = null;
+  
+  private int maxMapRuntime = 0;
+  
+  public EventFetcher(TaskAttemptID reduce,
+                      TaskUmbilicalProtocol umbilical,
+                      ShuffleScheduler<K,V> scheduler,
+                      ExceptionReporter reporter) {
+    setName("EventFetcher for fetching Map Completion Events");
+    setDaemon(true);    
+    this.reduce = reduce;
+    this.umbilical = umbilical;
+    this.scheduler = scheduler;
+    exceptionReporter = reporter;
+  }
+
+  @Override
+  public void run() {
+    int failures = 0;
+    LOG.info(reduce + " Thread started: " + getName());
+    
+    try {
+      while (true) {
+        try {
+          int numNewMaps = getMapCompletionEvents();
+          failures = 0;
+          if (numNewMaps > 0) {
+            LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
+          }
+          LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
+          Thread.sleep(SLEEP_TIME);
+        } catch (IOException ie) {
+          LOG.info("Exception in getting events", ie);
+          // check to see whether to abort
+          if (++failures >= MAX_RETRIES) {
+            throw new IOException("too many failures downloading events", ie);
+          }
+          // sleep for a bit
+          Thread.sleep(RETRY_PERIOD);
+        }
+      }
+    } catch (InterruptedException e) {
+      return;
+    } catch (Throwable t) {
+      exceptionReporter.reportException(t);
+      return;
+    }
+  }
+  
+  /** 
+   * Queries the {@link TaskTracker} for a set of map-completion events 
+   * from a given event ID.
+   * @throws IOException
+   */  
+  private int getMapCompletionEvents() throws IOException {
+    
+    int numNewMaps = 0;
+    
+    MapTaskCompletionEventsUpdate update = 
+      umbilical.getMapCompletionEvents((org.apache.hadoop.mapred.JobID)
+                                       reduce.getJobID(), 
+                                       fromEventId, 
+                                       MAX_EVENTS_TO_FETCH,
+                                       (org.apache.hadoop.mapred.TaskAttemptID)
+                                         reduce);
+    TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
+    LOG.debug("Got " + events.length + " map completion events from " + 
+             fromEventId);
+      
+    // Check if the reset is required.
+    // Since there is no ordering of the task completion events at the 
+    // reducer, the only option to sync with the new jobtracker is to reset 
+    // the events index
+    if (update.shouldReset()) {
+      fromEventId = 0;
+      scheduler.resetKnownMaps();
+    }
+    
+    // Update the last seen event ID
+    fromEventId += events.length;
+    
+    // Process the TaskCompletionEvents:
+    // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
+    // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop 
+    //    fetching from those maps.
+    // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
+    //    outputs at all.
+    for (TaskCompletionEvent event : events) {
+      switch (event.getTaskStatus()) {
+        case SUCCEEDED:
+          URI u = getBaseURI(event.getTaskTrackerHttp());
+          scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
+                                      u.toString(),
+                                      event.getTaskAttemptId());
+          numNewMaps ++;
+          int duration = event.getTaskRunTime();
+          if (duration > maxMapRuntime) {
+            maxMapRuntime = duration;
+            scheduler.informMaxMapRunTime(maxMapRuntime);
+          }
+          break;
+        case FAILED:
+        case KILLED:
+          break;
+        case OBSOLETE:
+          scheduler.obsoleteMapOutput(event.getTaskAttemptId());
+          LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + 
+                   " map-task: '" + event.getTaskAttemptId() + "'");
+          break;
+        case TIPFAILED:
+          scheduler.tipFailed(event.getTaskAttemptId().getTaskID());
+          LOG.info("Ignoring output of failed map TIP: '" +  
+               event.getTaskAttemptId() + "'");
+          break;
+      }
+    }
+    return numNewMaps;
+  }
+  
+  private URI getBaseURI(String url) {
+    StringBuffer baseUrl = new StringBuffer(url);
+    if (!url.endsWith("/")) {
+      baseUrl.append("/");
+    }
+    baseUrl.append("mapOutput?job=");
+    baseUrl.append(reduce.getJobID());
+    baseUrl.append("&reduce=");
+    baseUrl.append(reduce.getTaskID().getId());
+    baseUrl.append("&map=");
+    URI u = URI.create(baseUrl.toString());
+    return u;
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+/**
+ * An interface for reporting exceptions to other threads
+ */
+interface ExceptionReporter {
+  void reportException(Throwable t);
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,515 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.IFileInputStream;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+class Fetcher<K,V> extends Thread {
+  
+  private static final Log LOG = LogFactory.getLog(Fetcher.class);
+  
+  /** Number of ms before timing out a copy */
+  private static final int DEFAULT_STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
+  
+  /** Basic/unit connection timeout (in milliseconds) */
+  private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
+  
+  /* Default read timeout (in milliseconds) */
+  private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
+
+  private final Progressable reporter;
+  private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
+                                    CONNECTION, WRONG_REDUCE}
+  private final Counters.Counter connectionErrs;
+  private final Counters.Counter ioErrs;
+  private final Counters.Counter wrongLengthErrs;
+  private final Counters.Counter badIdErrs;
+  private final Counters.Counter wrongMapErrs;
+  private final Counters.Counter wrongReduceErrs;
+  private final MergeManager<K,V> merger;
+  private final ShuffleScheduler<K,V> scheduler;
+  private final ShuffleClientMetrics metrics;
+  private final ExceptionReporter exceptionReporter;
+  private final int id;
+  private static int nextId = 0;
+  private final int reduce;
+  
+  private final int connectionTimeout;
+  private final int readTimeout;
+  
+  // Decompression of map-outputs
+  private final CompressionCodec codec;
+  private final Decompressor decompressor;
+
+  public Fetcher(JobConf job, TaskAttemptID reduceId, 
+                 ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
+                 Reporter reporter, ShuffleClientMetrics metrics,
+                 ExceptionReporter exceptionReporter) {
+    this.reporter = reporter;
+    this.scheduler = scheduler;
+    this.merger = merger;
+    this.metrics = metrics;
+    this.exceptionReporter = exceptionReporter;
+    this.id = ++nextId;
+    this.reduce = reduceId.getTaskID().getId();
+    ioErrs = reporter.getCounter(ShuffleErrors.IO_ERROR);
+    wrongLengthErrs = reporter.getCounter(ShuffleErrors.WRONG_LENGTH);
+    badIdErrs = reporter.getCounter(ShuffleErrors.BAD_ID);
+    wrongMapErrs = reporter.getCounter(ShuffleErrors.WRONG_MAP);
+    connectionErrs = reporter.getCounter(ShuffleErrors.CONNECTION);
+    wrongReduceErrs = reporter.getCounter(ShuffleErrors.WRONG_REDUCE);
+    
+    if (job.getCompressMapOutput()) {
+      Class<? extends CompressionCodec> codecClass =
+        job.getMapOutputCompressorClass(DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, job);
+      decompressor = CodecPool.getDecompressor(codec);
+    } else {
+      codec = null;
+      decompressor = null;
+    }
+
+    this.connectionTimeout = 
+      job.getInt("mapred.shuffle.connect.timeout",
+                 DEFAULT_STALLED_COPY_TIMEOUT);
+    this.readTimeout = 
+      job.getInt("mapred.shuffle.read.timeout", DEFAULT_READ_TIMEOUT);
+    
+    setName("fetcher#" + id);
+    setDaemon(true);
+  }
+  
+  public void run() {
+    try {
+      while (true) {
+        MapHost host = null;
+        try {
+          // If merge is on, block
+          merger.waitForInMemoryMerge();
+
+          // Get a host to shuffle from
+          host = scheduler.getHost();
+          metrics.threadBusy();
+
+          // Shuffle
+          copyFromHost(host);
+        } finally {
+          if (host != null) {
+            scheduler.freeHost(host);
+            metrics.threadFree();            
+          }
+        }
+      }
+    } catch (InterruptedException ie) {
+      return;
+    } catch (Throwable t) {
+      exceptionReporter.reportException(t);
+    }
+  }
+  
+  /**
+   * The crux of the matter...
+   * 
+   * @param host {@link MapHost} from which we need to  
+   *              shuffle available map-outputs.
+   */
+  private void copyFromHost(MapHost host) throws IOException {
+    // Get completed maps on 'host'
+    List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
+    
+    // Sanity check to catch hosts with only 'OBSOLETE' maps, 
+    // especially at the tail of large jobs
+    if (maps.size() == 0) {
+      return;
+    }
+    
+    LOG.debug("Fetcher " + id + " going to fetch from " + host);
+    for (TaskAttemptID tmp: maps) {
+      LOG.debug(tmp);
+    }
+    
+    // List of maps to be fetched yet
+    Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);
+    
+    // Construct the url and connect
+    DataInputStream input;
+    boolean connectSucceeded = false;
+    
+    try {
+      URLConnection connection = getMapOutputURL(host, maps).openConnection();
+      connectSucceeded = true;
+      input = 
+        new DataInputStream(getInputStream(connection, connectionTimeout,
+                                           readTimeout));
+
+    } catch (IOException ie) {
+      ioErrs.increment(1);
+      LOG.warn("Failed to connect to " + host + " with " + remaining.size() + 
+               " map outputs", ie);
+
+      // If connect did not succeed, just mark all the maps as failed,
+      // indirectly penalizing the host
+      if (!connectSucceeded) {
+        for(TaskAttemptID left: remaining) {
+          scheduler.copyFailed(left, host, connectSucceeded);
+        }
+      } else {
+        // If we got a read error at this stage, it implies there was a problem
+        // with the first map, typically lost map. So, penalize only that map
+        // and add the rest
+        TaskAttemptID firstMap = maps.get(0);
+        scheduler.copyFailed(firstMap, host, connectSucceeded);
+      }
+      
+      // Add back all the remaining maps, WITHOUT marking them as failed
+      for(TaskAttemptID left: remaining) {
+        scheduler.putBackKnownMapOutput(host, left);
+      }
+      
+      return;
+    }
+    
+    try {
+      // Loop through available map-outputs and fetch them
+      // On any error, good becomes false and we exit after putting back
+      // the remaining maps to the yet_to_be_fetched list
+      boolean good = true;
+      while (!remaining.isEmpty() && good) {
+        good = copyMapOutput(host, input, remaining);
+      }
+      
+      // Drain the buffer, just in case
+      final int DRAIN_BUF_SIZE = 4096;
+      byte[] drainBuf = new byte[DRAIN_BUF_SIZE];
+      int retVal = 0;
+      while ( retVal != -1) {
+        retVal = input.read(drainBuf, 0, DRAIN_BUF_SIZE);
+      }
+      input.close();
+      
+      // Sanity check
+      if (good && !remaining.isEmpty()) {
+        throw new IOException("server didn't return all expected map outputs: "
+            + remaining.size() + " left.");
+      }
+    } finally {
+      for (TaskAttemptID left : remaining) {
+        scheduler.putBackKnownMapOutput(host, left);
+      }
+    }
+      
+   }
+  
+  private boolean copyMapOutput(MapHost host,
+                                DataInputStream input,
+                                Set<TaskAttemptID> remaining) {
+    MapOutput<K,V> mapOutput = null;
+    TaskAttemptID mapId = null;
+    long decompressedLength = -1;
+    long compressedLength = -1;
+    
+    try {
+      long startTime = System.currentTimeMillis();
+      int forReduce = -1;
+      //Read the shuffle header
+      try {
+        ShuffleHeader header = new ShuffleHeader();
+        header.readFields(input);
+        mapId = TaskAttemptID.forName(header.mapId);
+        compressedLength = header.compressedLength;
+        decompressedLength = header.uncompressedLength;
+        forReduce = header.forReduce;
+      } catch (IllegalArgumentException e) {
+        badIdErrs.increment(1);
+        LOG.warn("Invalid map id ", e);
+        return false;
+      }
+
+ 
+      // Do some basic sanity verification
+      if (!verifySanity(compressedLength, decompressedLength, forReduce,
+          remaining, mapId)) {
+        return false;
+      }
+      
+      LOG.debug("header: " + mapId + ", len: " + compressedLength + 
+               ", decomp len: " + decompressedLength);
+      
+      // Get the location for the map output - either in-memory or on-disk
+      mapOutput = merger.reserve(mapId, decompressedLength, id);
+      
+      // Check if we can shuffle *now* ...
+      if (mapOutput.getType() == Type.WAIT) {
+        LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
+        return false;
+      } 
+      
+      // Go!
+      LOG.info("fetcher#" + id + " about to shuffle output of map " + 
+               mapOutput.getMapId() + " decomp: " +
+               decompressedLength + " len: " + compressedLength + " to " +
+               mapOutput.getType());
+      if (mapOutput.getType() == Type.MEMORY) {
+        shuffleToMemory(host, mapOutput, input, 
+                        (int) decompressedLength, (int) compressedLength);
+      } else {
+        shuffleToDisk(host, mapOutput, input, compressedLength);
+      }
+      
+      // Inform the shuffle scheduler
+      long endTime = System.currentTimeMillis();
+      scheduler.copySucceeded(mapId, host, compressedLength, 
+                              endTime - startTime, mapOutput);
+      // Note successful shuffle
+      remaining.remove(mapId);
+      metrics.successFetch();
+      return true;
+    } catch (IOException ioe) {
+      ioErrs.increment(1);
+      if (mapId == null || mapOutput == null) {
+        LOG.info("fetcher#" + id + " failed to read map header" + 
+                 mapId + " decomp: " + 
+                 decompressedLength + ", " + compressedLength, ioe);
+        return false;
+      }
+      
+      LOG.info("Failed to shuffle output of " + mapId + 
+               " from " + host.getHostName(), ioe); 
+
+      // Inform the shuffle-scheduler
+      mapOutput.abort();
+      scheduler.copyFailed(mapId, host, true);
+      metrics.failedFetch();
+      return false;
+    }
+
+  }
+  
+  /**
+   * Do some basic verification on the input received -- Being defensive
+   * @param compressedLength
+   * @param decompressedLength
+   * @param forReduce
+   * @param remaining
+   * @param mapId
+   * @return true/false, based on if the verification succeeded or not
+   */
+  private boolean verifySanity(long compressedLength, long decompressedLength,
+      int forReduce, Set<TaskAttemptID> remaining, TaskAttemptID mapId) {
+    if (compressedLength < 0 || decompressedLength < 0) {
+      wrongLengthErrs.increment(1);
+      LOG.warn(getName() + " invalid lengths in map output header: id: " +
+               mapId + " len: " + compressedLength + ", decomp len: " + 
+               decompressedLength);
+      return false;
+    }
+    
+    if (forReduce != reduce) {
+      wrongReduceErrs.increment(1);
+      LOG.warn(getName() + " data for the wrong reduce map: " +
+               mapId + " len: " + compressedLength + " decomp len: " +
+               decompressedLength + " for reduce " + forReduce);
+      return false;
+    }
+
+    // Sanity check
+    if (!remaining.contains(mapId)) {
+      wrongMapErrs.increment(1);
+      LOG.warn("Invalid map-output! Received output for " + mapId);
+      return false;
+    }
+    
+    return true;
+  }
+
+  /**
+   * Create the map-output-url. This will contain all the map ids
+   * separated by commas
+   * @param host
+   * @param maps
+   * @return
+   * @throws MalformedURLException
+   */
+  private URL getMapOutputURL(MapHost host, List<TaskAttemptID> maps
+                              )  throws MalformedURLException {
+    // Get the base url
+    StringBuffer url = new StringBuffer(host.getBaseUrl());
+    
+    boolean first = true;
+    for (TaskAttemptID mapId : maps) {
+      if (!first) {
+        url.append(",");
+      }
+      url.append(mapId);
+      first = false;
+    }
+   
+    LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
+    return new URL(url.toString());
+  }
+  
+  /** 
+   * The connection establishment is attempted multiple times and is given up 
+   * only on the last failure. Instead of connecting with a timeout of 
+   * X, we try connecting with a timeout of x < X but multiple times. 
+   */
+  private InputStream getInputStream(URLConnection connection, 
+                                     int connectionTimeout, 
+                                     int readTimeout) 
+  throws IOException {
+    int unit = 0;
+    if (connectionTimeout < 0) {
+      throw new IOException("Invalid timeout "
+                            + "[timeout = " + connectionTimeout + " ms]");
+    } else if (connectionTimeout > 0) {
+      unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
+    }
+    // set the read timeout to the total timeout
+    connection.setReadTimeout(readTimeout);
+    // set the connect timeout to the unit-connect-timeout
+    connection.setConnectTimeout(unit);
+    while (true) {
+      try {
+        return connection.getInputStream();
+      } catch (IOException ioe) {
+        // update the total remaining connect-timeout
+        connectionTimeout -= unit;
+
+        // throw an exception if we have waited for timeout amount of time
+        // note that the updated value if timeout is used here
+        if (connectionTimeout == 0) {
+          throw ioe;
+        }
+
+        // reset the connect timeout for the last try
+        if (connectionTimeout < unit) {
+          unit = connectionTimeout;
+          // reset the connect time out for the final connect
+          connection.setConnectTimeout(unit);
+        }
+      }
+    }
+  }
+
+  private void shuffleToMemory(MapHost host, MapOutput<K,V> mapOutput, 
+                               InputStream input, 
+                               int decompressedLength, 
+                               int compressedLength) throws IOException {    
+    IFileInputStream checksumIn = 
+      new IFileInputStream(input, compressedLength);
+
+    input = checksumIn;       
+  
+    // Are map-outputs compressed?
+    if (codec != null) {
+      decompressor.reset();
+      input = codec.createInputStream(input, decompressor);
+    }
+  
+    // Copy map-output into an in-memory buffer
+    byte[] shuffleData = mapOutput.getMemory();
+    
+    try {
+      IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
+      metrics.inputBytes(shuffleData.length);
+      reporter.progress();
+      LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
+               mapOutput.getMapId());
+    } catch (IOException ioe) {      
+      // Close the streams
+      IOUtils.cleanup(LOG, input);
+
+      // Re-throw
+      throw ioe;
+    }
+
+  }
+  
+  private void shuffleToDisk(MapHost host, MapOutput<K,V> mapOutput, 
+                             InputStream input, 
+                             long compressedLength) 
+  throws IOException {
+    // Copy data to local-disk
+    OutputStream output = mapOutput.getDisk();
+    long bytesLeft = compressedLength;
+    try {
+      final int BYTES_TO_READ = 64 * 1024;
+      byte[] buf = new byte[BYTES_TO_READ];
+      while (bytesLeft > 0) {
+        int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+        if (n < 0) {
+          throw new IOException("read past end of stream reading " + 
+                                mapOutput.getMapId());
+        }
+        output.write(buf, 0, n);
+        bytesLeft -= n;
+        metrics.inputBytes(n);
+        reporter.progress();
+      }
+
+      LOG.info("Read " + (compressedLength - bytesLeft) + 
+               " bytes from map-output for " +
+               mapOutput.getMapId());
+
+      output.close();
+    } catch (IOException ioe) {
+      // Close the streams
+      IOUtils.cleanup(LOG, input, output);
+
+      // Re-throw
+      throw ioe;
+    }
+
+    // Sanity check
+    if (bytesLeft != 0) {
+      throw new IOException("Incomplete map output received for " +
+                            mapOutput.getMapId() + " from " +
+                            host.getHostName() + " (" + 
+                            bytesLeft + " bytes missing of " + 
+                            compressedLength + ")"
+      );
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * <code>IFile.InMemoryReader</code> to read map-outputs present in-memory.
+ */
+public class InMemoryReader<K, V> extends Reader<K, V> {
+  private final TaskAttemptID taskAttemptId;
+  private final MergeManager<K,V> merger;
+  DataInputBuffer memDataIn = new DataInputBuffer();
+  private int start;
+  private int length;
+
+  public InMemoryReader(MergeManager<K,V> merger, TaskAttemptID taskAttemptId,
+                        byte[] data, int start, int length)
+  throws IOException {
+    super(null, null, length - start, null, null);
+    this.merger = merger;
+    this.taskAttemptId = taskAttemptId;
+
+    buffer = data;
+    bufferSize = (int)fileLength;
+    memDataIn.reset(buffer, start, length);
+    this.start = start;
+    this.length = length;
+  }
+
+  @Override
+  public void reset(int offset) {
+    memDataIn.reset(buffer, start + offset, length);
+    bytesRead = offset;
+    eof = false;
+  }
+
+  @Override
+  public long getPosition() throws IOException {
+    // InMemoryReader does not initialize streams like Reader, so in.getPos()
+    // would not work. Instead, return the number of uncompressed bytes read,
+    // which will be correct since in-memory data is not compressed.
+    return bytesRead;
+  }
+  
+  @Override
+  public long getLength() { 
+    return fileLength;
+  }
+  
+  private void dumpOnError() {
+    File dumpFile = new File("../output/" + taskAttemptId + ".dump");
+    System.err.println("Dumping corrupt map-output of " + taskAttemptId + 
+                       " to " + dumpFile.getAbsolutePath());
+    try {
+      FileOutputStream fos = new FileOutputStream(dumpFile);
+      fos.write(buffer, 0, bufferSize);
+      fos.close();
+    } catch (IOException ioe) {
+      System.err.println("Failed to dump map-output of " + taskAttemptId);
+    }
+  }
+  
+  public boolean nextRawKey(DataInputBuffer key) throws IOException {
+    try {
+      if (!positionToNextRecord(memDataIn)) {
+        return false;
+      }
+      // Setup the key
+      int pos = memDataIn.getPosition();
+      byte[] data = memDataIn.getData();
+      key.reset(data, pos, currentKeyLength);
+      // Position for the next value
+      long skipped = memDataIn.skip(currentKeyLength);
+      if (skipped != currentKeyLength) {
+        throw new IOException("Rec# " + recNo + 
+            ": Failed to skip past key of length: " + 
+            currentKeyLength);
+      }
+
+      // Record the byte
+      bytesRead += currentKeyLength;
+      return true;
+    } catch (IOException ioe) {
+      dumpOnError();
+      throw ioe;
+    }
+  }
+  
+  public void nextRawValue(DataInputBuffer value) throws IOException {
+    try {
+      int pos = memDataIn.getPosition();
+      byte[] data = memDataIn.getData();
+      value.reset(data, pos, currentValueLength);
+
+      // Position for the next record
+      long skipped = memDataIn.skip(currentValueLength);
+      if (skipped != currentValueLength) {
+        throw new IOException("Rec# " + recNo + 
+            ": Failed to skip past value of length: " + 
+            currentValueLength);
+      }
+      // Record the byte
+      bytesRead += currentValueLength;
+
+      ++recNo;
+    } catch (IOException ioe) {
+      dumpOnError();
+      throw ioe;
+    }
+  }
+    
+  public void close() {
+    // Release
+    dataIn = null;
+    buffer = null;
+      // Inform the MergeManager
+    if (merger != null) {
+      merger.unreserve(bufferSize);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryWriter.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryWriter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryWriter.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.IFileOutputStream;
+import org.apache.hadoop.mapred.IFile.Writer;
+
+public class InMemoryWriter<K, V> extends Writer<K, V> {
+  private DataOutputStream out;
+  
+  public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
+    super(null);
+    this.out = 
+      new DataOutputStream(new IFileOutputStream(arrayStream));
+  }
+  
+  public void append(K key, V value) throws IOException {
+    throw new UnsupportedOperationException
+    ("InMemoryWriter.append(K key, V value");
+  }
+  
+  public void append(DataInputBuffer key, DataInputBuffer value)
+  throws IOException {
+    int keyLength = key.getLength() - key.getPosition();
+    if (keyLength < 0) {
+      throw new IOException("Negative key-length not allowed: " + keyLength + 
+                            " for " + key);
+    }
+    
+    int valueLength = value.getLength() - value.getPosition();
+    if (valueLength < 0) {
+      throw new IOException("Negative value-length not allowed: " + 
+                            valueLength + " for " + value);
+    }
+
+    WritableUtils.writeVInt(out, keyLength);
+    WritableUtils.writeVInt(out, valueLength);
+    out.write(key.getData(), key.getPosition(), keyLength); 
+    out.write(value.getData(), value.getPosition(), valueLength); 
+  }
+
+  public void close() throws IOException {
+    // Write EOF_MARKER for key/value length
+    WritableUtils.writeVInt(out, IFile.EOF_MARKER);
+    WritableUtils.writeVInt(out, IFile.EOF_MARKER);
+    
+    // Close the stream 
+    out.close();
+    out = null;
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+class MapHost {
+  
+  public static enum State {
+    IDLE,               // No map outputs available
+    BUSY,               // Map outputs are being fetched
+    PENDING,            // Known map outputs which need to be fetched
+    PENALIZED           // Host penalized due to shuffle failures
+  }
+  
+  private State state = State.IDLE;
+  private final String hostName;
+  private final String baseUrl;
+  private List<TaskAttemptID> maps = new ArrayList<TaskAttemptID>();
+  
+  public MapHost(String hostName, String baseUrl) {
+    this.hostName = hostName;
+    this.baseUrl = baseUrl;
+  }
+  
+  public State getState() {
+    return state;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public String getBaseUrl() {
+    return baseUrl;
+  }
+
+  public synchronized void addKnownMap(TaskAttemptID mapId) {
+    maps.add(mapId);
+    if (state == State.IDLE) {
+      state = State.PENDING;
+    }
+  }
+  
+  public synchronized List<TaskAttemptID> getAndClearKnownMaps() {
+    List<TaskAttemptID> currentKnownMaps = maps;
+    maps = new ArrayList<TaskAttemptID>();
+    return currentKnownMaps;
+  }
+  
+  public synchronized void markBusy() {
+    state = State.BUSY;
+  }
+  
+  public synchronized void markPenalized() {
+    state = State.PENALIZED;
+  }
+  
+  public synchronized int getNumKnownMapOutputs() {
+    return maps.size();
+  }
+
+  /**
+   * Called when the node is done with its penalty or done copying.
+   * @return the host's new state
+   */
+  public synchronized State markAvailable() {
+    if (maps.isEmpty()) {
+      state = State.IDLE;
+    } else {
+      state = State.PENDING;
+    }
+    return state;
+  }
+  
+  @Override
+  public String toString() {
+    return hostName;
+  }
+  
+  /**
+   * Mark the host as penalized
+   */
+  public synchronized void penalize() {
+    state = State.PENALIZED;
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Comparator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+class MapOutput<K,V> {
+  private static final Log LOG = LogFactory.getLog(MapOutput.class);
+  private static AtomicInteger ID = new AtomicInteger(0);
+  
+  public static enum Type {
+    WAIT,
+    MEMORY,
+    DISK
+  }
+  
+  private final int id;
+  
+  private final MergeManager<K,V> merger;
+  private final TaskAttemptID mapId;
+  
+  private final long size;
+  
+  private final byte[] memory;
+  private BoundedByteArrayOutputStream byteStream;
+  
+  private final FileSystem localFS;
+  private final Path tmpOutputPath;
+  private final Path outputPath;
+  private final OutputStream disk; 
+  
+  private final Type type;
+  
+  private final boolean primaryMapOutput;
+  
+  MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size, 
+            Configuration conf, LocalDirAllocator localDirAllocator,
+            int fetcher, boolean primaryMapOutput)  throws IOException {
+    this.id = ID.incrementAndGet();
+    this.mapId = mapId;
+    this.merger = merger;
+
+    type = Type.DISK;
+
+    memory = null;
+    byteStream = null;
+
+    this.size = size;
+    
+    this.localFS = FileSystem.getLocal(conf);
+    String filename = "map_" + mapId.getTaskID().getId() + ".out";
+    String tmpOutput = Path.SEPARATOR + TaskTracker.getJobCacheSubdir() +
+                       Path.SEPARATOR + mapId.getJobID() +
+                       Path.SEPARATOR + merger.getReduceId() +
+                       Path.SEPARATOR + "output" + 
+                       Path.SEPARATOR + filename + 
+                       "." + fetcher; 
+
+    tmpOutputPath = 
+      localDirAllocator.getLocalPathForWrite(tmpOutput, size, conf);
+    outputPath = new Path(tmpOutputPath.getParent(), filename);
+    disk = localFS.create(tmpOutputPath);
+    
+    this.primaryMapOutput = primaryMapOutput;
+  }
+  
+  MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size, 
+            boolean primaryMapOutput) {
+    this.id = ID.incrementAndGet();
+    this.mapId = mapId;
+    this.merger = merger;
+
+    type = Type.MEMORY;
+    byteStream = new BoundedByteArrayOutputStream(size);
+    memory = byteStream.getBuffer();
+
+    this.size = size;
+    
+    localFS = null;
+    disk = null;
+    outputPath = null;
+    tmpOutputPath = null;
+    
+    this.primaryMapOutput = primaryMapOutput;
+  }
+
+  public MapOutput(TaskAttemptID mapId) {
+    this.id = ID.incrementAndGet();
+    this.mapId = mapId;
+    
+    type = Type.WAIT;
+    merger = null;
+    memory = null;
+    byteStream = null;
+    
+    size = -1;
+    
+    localFS = null;
+    disk = null;
+    outputPath = null;
+    tmpOutputPath = null;
+
+    this.primaryMapOutput = false;
+}
+  
+  public boolean isPrimaryMapOutput() {
+    return primaryMapOutput;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof MapOutput) {
+      return id == ((MapOutput)obj).id;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return id;
+  }
+
+  public Path getOutputPath() {
+    return outputPath;
+  }
+
+  public byte[] getMemory() {
+    return memory;
+  }
+
+  public BoundedByteArrayOutputStream getArrayStream() {
+    return byteStream;
+  }
+  
+  public OutputStream getDisk() {
+    return disk;
+  }
+
+  public TaskAttemptID getMapId() {
+    return mapId;
+  }
+
+  public Type getType() {
+    return type;
+  }
+
+  public long getSize() {
+    return size;
+  }
+
+  public void commit() throws IOException {
+    if (type == Type.MEMORY) {
+      merger.closeInMemoryFile(this);
+    } else if (type == Type.DISK) {
+      localFS.rename(tmpOutputPath, outputPath);
+      merger.closeOnDiskFile(outputPath);
+    } else {
+      throw new IOException("Cannot commit MapOutput of type WAIT!");
+    }
+  }
+  
+  public void abort() {
+    if (type == Type.MEMORY) {
+      merger.unreserve(memory.length);
+    } else if (type == Type.DISK) {
+      try {
+        localFS.delete(tmpOutputPath, false);
+      } catch (IOException ie) {
+        LOG.info("failure to clean up " + tmpOutputPath, ie);
+      }
+    } else {
+      throw new IllegalArgumentException
+                   ("Cannot commit MapOutput with of type WAIT!");
+    }
+  }
+  
+  public String toString() {
+    return "MapOutput(" + mapId + ", " + type + ")";
+  }
+  
+  public static class MapOutputComparator<K, V> 
+  implements Comparator<MapOutput<K, V>> {
+    public int compare(MapOutput<K, V> o1, MapOutput<K, V> o2) {
+      if (o1.id == o2.id) { 
+        return 0;
+      }
+      
+      if (o1.size < o2.size) {
+        return -1;
+      } else if (o1.size > o2.size) {
+        return 1;
+      }
+      
+      if (o1.id < o2.id) {
+        return -1;
+      } else {
+        return 1;
+      
+      }
+    }
+  }
+  
+}



Mime
View raw message