hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r810944 [2/4] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/task/ src/java/org/apache/hadoop/mapreduce/task/reduce/...
Date Thu, 03 Sep 2009 13:56:22 GMT
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Sep  3 13:56:21 2009
@@ -20,72 +20,38 @@
 
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.Math;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.net.URLConnection;
-import java.text.DecimalFormat;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumFileSystem;
-import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapred.IFile.*;
-import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
 
 /** A Reduce task. */
-class ReduceTask extends Task {
+public class ReduceTask extends Task {
 
   static {                                        // register a ctor
     WritableFactories.setFactory
@@ -97,7 +63,6 @@
   
   private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
   private int numMaps;
-  private ReduceCopier reduceCopier;
 
   private CompressionCodec codec;
 
@@ -110,16 +75,20 @@
   private Progress copyPhase;
   private Progress sortPhase;
   private Progress reducePhase;
+  private Counters.Counter shuffledMapsCounter = 
+    getCounters().findCounter(Counter.SHUFFLED_MAPS);
   private Counters.Counter reduceShuffleBytes = 
-    getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
+    getCounters().findCounter(Counter.REDUCE_SHUFFLE_BYTES);
   private Counters.Counter reduceInputKeyCounter = 
-    getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+    getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
   private Counters.Counter reduceInputValueCounter = 
-    getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+    getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS);
   private Counters.Counter reduceOutputCounter = 
-    getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
+    getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS);
+  private Counters.Counter reduceCombineInputCounter =
+    getCounters().findCounter(Counter.COMBINE_INPUT_RECORDS);
   private Counters.Counter reduceCombineOutputCounter =
-    getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+    getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS);
 
   // A custom comparator for map output files. Here the ordering is determined
   // by the file's size and path. In case of files with same size and different
@@ -267,9 +236,9 @@
        super(in, comparator, keyClass, valClass, conf, reporter);
        this.umbilical = umbilical;
        this.skipGroupCounter = 
-         reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
+         reporter.getCounter(Counter.REDUCE_SKIPPED_GROUPS);
        this.skipRecCounter = 
-         reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
+         reporter.getCounter(Counter.REDUCE_SKIPPED_RECORDS);
        this.toWriteSkipRecs = toWriteSkipRecs() &&  
          SkipBadRecords.getSkipOutputPath(conf)!=null;
        this.keyClass = keyClass;
@@ -279,12 +248,12 @@
        mayBeSkip();
      }
      
-     void nextKey() throws IOException {
+     public void nextKey() throws IOException {
        super.nextKey();
        mayBeSkip();
      }
      
-     boolean more() { 
+     public boolean more() { 
        return super.more() && hasNext; 
      }
      
@@ -338,7 +307,6 @@
   @SuppressWarnings("unchecked")
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException, InterruptedException, ClassNotFoundException {
-    this.umbilical = umbilical;
     job.setBoolean("mapred.skip.on", isSkipping());
 
     if (isMapOrReduce()) {
@@ -368,31 +336,35 @@
     
     // Initialize the codec
     codec = initCodec();
-
+    RawKeyValueIterator rIter = null;
     boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
     if (!isLocal) {
-      reduceCopier = new ReduceCopier(umbilical, job, reporter);
-      if (!reduceCopier.fetchOutputs()) {
-        if(reduceCopier.mergeThrowable instanceof FSError) {
-          throw (FSError)reduceCopier.mergeThrowable;
-        }
-        throw new IOException("Task: " + getTaskID() + 
-            " - The reduce copier failed", reduceCopier.mergeThrowable);
-      }
+      Class combinerClass = conf.getCombinerClass();
+      CombineOutputCollector combineCollector = 
+        (null != combinerClass) ? 
+            new CombineOutputCollector(reduceCombineOutputCounter) : null;
+
+      Shuffle shuffle = 
+        new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
+                    super.lDirAlloc, reporter, codec, 
+                    combinerClass, combineCollector, 
+                    spilledRecordsCounter, reduceCombineInputCounter,
+                    shuffledMapsCounter,
+                    reduceShuffleBytes, failedShuffleCounter,
+                    mergedMapOutputsCounter,
+                    taskStatus, copyPhase, sortPhase, this);
+      rIter = shuffle.run();
+    } else {
+      final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+      rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
+                           job.getMapOutputValueClass(), codec, 
+                           getMapFiles(rfs, true),
+                           !conf.getKeepFailedTaskFiles(), 
+                           job.getInt("io.sort.factor", 100),
+                           new Path(getTaskID().toString()), 
+                           job.getOutputKeyComparator(),
+                           reporter, spilledRecordsCounter, null, null);
     }
-    copyPhase.complete();                         // copy is already complete
-    setPhase(TaskStatus.Phase.SORT);
-    statusUpdate(umbilical);
-
-    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
-    RawKeyValueIterator rIter = isLocal
-      ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
-          job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
-          !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
-          new Path(getTaskID().toString()), job.getOutputKeyComparator(),
-          reporter, spilledRecordsCounter, null, sortPhase)
-      : reduceCopier.createKVIterator(job, rfs, reporter);
-        
     // free up the data structures
     mapOutputFilesOnDisk.clear();
     
@@ -561,2268 +533,4 @@
     reducer.run(reducerContext);
     output.close(reducerContext);
   }
-
-  private static enum CopyOutputErrorType {
-    NO_ERROR,
-    READ_ERROR,
-    OTHER_ERROR
-  };
-
-  class ReduceCopier<K, V> implements MRConstants {
-
-    /** Reference to the umbilical object */
-    private TaskUmbilicalProtocol umbilical;
-    private final TaskReporter reporter;
-    
-    /** Reference to the task object */
-    
-    /** Number of ms before timing out a copy */
-    private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
-    
-    /** Max events to fetch in one go from the tasktracker */
-    private static final int MAX_EVENTS_TO_FETCH = 10000;
-
-    /**
-     * our reduce task instance
-     */
-    private ReduceTask reduceTask;
-    
-    /**
-     * the list of map outputs currently being copied
-     */
-    private List<MapOutputLocation> scheduledCopies;
-    
-    /**
-     *  the results of dispatched copy attempts
-     */
-    private List<CopyResult> copyResults;
-    
-    /**
-     *  the number of outputs to copy in parallel
-     */
-    private int numCopiers;
-    
-    /**
-     *  a number that is set to the max #fetches we'd schedule and then
-     *  pause the schduling
-     */
-    private int maxInFlight;
-    
-    /**
-     * the amount of time spent on fetching one map output before considering 
-     * it as failed and notifying the jobtracker about it.
-     */
-    private int maxBackoff;
-    
-    /**
-     * busy hosts from which copies are being backed off
-     * Map of host -> next contact time
-     */
-    private Map<String, Long> penaltyBox;
-    
-    /**
-     * the set of unique hosts from which we are copying
-     */
-    private Set<String> uniqueHosts;
-    
-    /**
-     * A reference to the RamManager for writing the map outputs to.
-     */
-    
-    private ShuffleRamManager ramManager;
-    
-    /**
-     * A reference to the local file system for writing the map outputs to.
-     */
-    private FileSystem localFileSys;
-
-    private FileSystem rfs;
-    /**
-     * Number of files to merge at a time
-     */
-    private int ioSortFactor;
-    
-    /**
-     * A reference to the throwable object (if merge throws an exception)
-     */
-    private volatile Throwable mergeThrowable;
-    
-    /** 
-     * A flag to indicate when to exit localFS merge
-     */
-    private volatile boolean exitLocalFSMerge = false;
-
-    /** 
-     * A flag to indicate when to exit getMapEvents thread 
-     */
-    private volatile boolean exitGetMapEvents = false;
-    
-    /**
-     * When we accumulate maxInMemOutputs number of files in ram, we merge/spill
-     */
-    private final int maxInMemOutputs;
-
-    /**
-     * Usage threshold for in-memory output accumulation.
-     */
-    private final float maxInMemCopyPer;
-
-    /**
-     * Maximum memory usage of map outputs to merge from memory into
-     * the reduce, in bytes.
-     */
-    private final long maxInMemReduce;
-
-    /**
-     * The threads for fetching the files.
-     */
-    private List<MapOutputCopier> copiers = null;
-    
-    /**
-     * The object for metrics reporting.
-     */
-    private ShuffleClientMetrics shuffleClientMetrics = null;
-    
-    /**
-     * the minimum interval between tasktracker polls
-     */
-    private static final long MIN_POLL_INTERVAL = 1000;
-    
-    /**
-     * a list of map output locations for fetch retrials 
-     */
-    private List<MapOutputLocation> retryFetches =
-      new ArrayList<MapOutputLocation>();
-    
-    /** 
-     * The set of required map outputs
-     */
-    private Set <TaskID> copiedMapOutputs = 
-      Collections.synchronizedSet(new TreeSet<TaskID>());
-    
-    /** 
-     * The set of obsolete map taskids.
-     */
-    private Set <TaskAttemptID> obsoleteMapIds = 
-      Collections.synchronizedSet(new TreeSet<TaskAttemptID>());
-    
-    private Random random = null;
-
-    /**
-     * the max of all the map completion times
-     */
-    private int maxMapRuntime;
-    
-    /**
-     * Maximum number of fetch-retries per-map.
-     */
-    private volatile int maxFetchRetriesPerMap;
-    
-    /**
-     * Combiner runner, if a combiner is needed
-     */
-    private CombinerRunner combinerRunner;
-
-    /**
-     * Resettable collector used for combine.
-     */
-    private CombineOutputCollector combineCollector = null;
-
-    /**
-     * Maximum percent of failed fetch attempt before killing the reduce task.
-     */
-    private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
-
-    /**
-     * Minimum percent of progress required to keep the reduce alive.
-     */
-    private static final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
-
-    /**
-     * Maximum percent of shuffle execution time required to keep the reducer alive.
-     */
-    private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
-    
-    /**
-     * Minimum number of map fetch retries.
-     */
-    private static final int MIN_FETCH_RETRIES_PER_MAP = 2;
-
-    /**
-     * The minimum percentage of maps yet to be copied, 
-     * which indicates end of shuffle
-     */
-    private static final float MIN_PENDING_MAPS_PERCENT = 0.25f;
-    /**
-     * Maximum no. of unique maps from which we failed to fetch map-outputs
-     * even after {@link #maxFetchRetriesPerMap} retries; after this the
-     * reduce task is failed.
-     */
-    private int maxFailedUniqueFetches = 5;
-
-    /**
-     * The maps from which we fail to fetch map-outputs 
-     * even after {@link #maxFetchRetriesPerMap} retries.
-     */
-    Set<TaskID> fetchFailedMaps = new TreeSet<TaskID>(); 
-    
-    /**
-     * A map of taskId -> no. of failed fetches
-     */
-    Map<TaskAttemptID, Integer> mapTaskToFailedFetchesMap = 
-      new HashMap<TaskAttemptID, Integer>();    
-
-    /**
-     * Initial backoff interval (milliseconds)
-     */
-    private static final int BACKOFF_INIT = 4000; 
-    
-    /**
-     * The interval for logging in the shuffle
-     */
-    private static final int MIN_LOG_TIME = 60000;
-
-    /** 
-     * List of in-memory map-outputs.
-     */
-    private final List<MapOutput> mapOutputsFilesInMemory =
-      Collections.synchronizedList(new LinkedList<MapOutput>());
-    
-    /**
-     * The map for (Hosts, List of MapIds from this Host) maintaining
-     * map output locations
-     */
-    private final Map<String, List<MapOutputLocation>> mapLocations = 
-      new ConcurrentHashMap<String, List<MapOutputLocation>>();
-    
-    /**
-     * This class contains the methods that should be used for metrics-reporting
-     * the specific metrics for shuffle. This class actually reports the
-     * metrics for the shuffle client (the ReduceTask), and hence the name
-     * ShuffleClientMetrics.
-     */
-    class ShuffleClientMetrics implements Updater {
-      private MetricsRecord shuffleMetrics = null;
-      private int numFailedFetches = 0;
-      private int numSuccessFetches = 0;
-      private long numBytes = 0;
-      private int numThreadsBusy = 0;
-      ShuffleClientMetrics(JobConf conf) {
-        MetricsContext metricsContext = MetricsUtil.getContext("mapred");
-        this.shuffleMetrics = 
-          MetricsUtil.createRecord(metricsContext, "shuffleInput");
-        this.shuffleMetrics.setTag("user", conf.getUser());
-        this.shuffleMetrics.setTag("jobName", conf.getJobName());
-        this.shuffleMetrics.setTag("jobId", ReduceTask.this.getJobID().toString());
-        this.shuffleMetrics.setTag("taskId", getTaskID().toString());
-        this.shuffleMetrics.setTag("sessionId", conf.getSessionId());
-        metricsContext.registerUpdater(this);
-      }
-      public synchronized void inputBytes(long numBytes) {
-        this.numBytes += numBytes;
-      }
-      public synchronized void failedFetch() {
-        ++numFailedFetches;
-      }
-      public synchronized void successFetch() {
-        ++numSuccessFetches;
-      }
-      public synchronized void threadBusy() {
-        ++numThreadsBusy;
-      }
-      public synchronized void threadFree() {
-        --numThreadsBusy;
-      }
-      public void doUpdates(MetricsContext unused) {
-        synchronized (this) {
-          shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
-          shuffleMetrics.incrMetric("shuffle_failed_fetches", 
-                                    numFailedFetches);
-          shuffleMetrics.incrMetric("shuffle_success_fetches", 
-                                    numSuccessFetches);
-          if (numCopiers != 0) {
-            shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
-                100*((float)numThreadsBusy/numCopiers));
-          } else {
-            shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
-          }
-          numBytes = 0;
-          numSuccessFetches = 0;
-          numFailedFetches = 0;
-        }
-        shuffleMetrics.update();
-      }
-    }
-
-    /** Represents the result of an attempt to copy a map output */
-    private class CopyResult {
-      
-      // the map output location against which a copy attempt was made
-      private final MapOutputLocation loc;
-      
-      // the size of the file copied, -1 if the transfer failed
-      private final long size;
-      
-      //a flag signifying whether a copy result is obsolete
-      private static final int OBSOLETE = -2;
-      
-      private CopyOutputErrorType error = CopyOutputErrorType.NO_ERROR;
-      CopyResult(MapOutputLocation loc, long size) {
-        this.loc = loc;
-        this.size = size;
-      }
-
-      CopyResult(MapOutputLocation loc, long size, CopyOutputErrorType error) {
-        this.loc = loc;
-        this.size = size;
-        this.error = error;
-      }
-
-      public boolean getSuccess() { return size >= 0; }
-      public boolean isObsolete() { 
-        return size == OBSOLETE;
-      }
-      public long getSize() { return size; }
-      public String getHost() { return loc.getHost(); }
-      public MapOutputLocation getLocation() { return loc; }
-      public CopyOutputErrorType getError() { return error; }
-    }
-    
-    private int nextMapOutputCopierId = 0;
-    
-    /**
-     * Abstraction to track a map-output.
-     */
-    private class MapOutputLocation {
-      TaskAttemptID taskAttemptId;
-      TaskID taskId;
-      String ttHost;
-      URL taskOutput;
-      
-      public MapOutputLocation(TaskAttemptID taskAttemptId, 
-                               String ttHost, URL taskOutput) {
-        this.taskAttemptId = taskAttemptId;
-        this.taskId = this.taskAttemptId.getTaskID();
-        this.ttHost = ttHost;
-        this.taskOutput = taskOutput;
-      }
-      
-      public TaskAttemptID getTaskAttemptId() {
-        return taskAttemptId;
-      }
-      
-      public TaskID getTaskId() {
-        return taskId;
-      }
-      
-      public String getHost() {
-        return ttHost;
-      }
-      
-      public URL getOutputLocation() {
-        return taskOutput;
-      }
-    }
-    
-    /** Describes the output of a map; could either be on disk or in-memory. */
-    private class MapOutput {
-      final TaskID mapId;
-      final TaskAttemptID mapAttemptId;
-      
-      final Path file;
-      final Configuration conf;
-      
-      byte[] data;
-      final boolean inMemory;
-      long compressedSize;
-      
-      public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, 
-                       Configuration conf, Path file, long size) {
-        this.mapId = mapId;
-        this.mapAttemptId = mapAttemptId;
-        
-        this.conf = conf;
-        this.file = file;
-        this.compressedSize = size;
-        
-        this.data = null;
-        
-        this.inMemory = false;
-      }
-      
-      public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, byte[] data, int compressedLength) {
-        this.mapId = mapId;
-        this.mapAttemptId = mapAttemptId;
-        
-        this.file = null;
-        this.conf = null;
-        
-        this.data = data;
-        this.compressedSize = compressedLength;
-        
-        this.inMemory = true;
-      }
-      
-      public void discard() throws IOException {
-        if (inMemory) {
-          data = null;
-        } else {
-          FileSystem fs = file.getFileSystem(conf);
-          fs.delete(file, true);
-        }
-      }
-    }
-    
-    class ShuffleRamManager implements RamManager {
-      /* Maximum percentage of the in-memory limit that a single shuffle can 
-       * consume*/ 
-      private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;
-      
-      /* Maximum percentage of shuffle-threads which can be stalled 
-       * simultaneously after which a merge is triggered. */ 
-      private static final float MAX_STALLED_SHUFFLE_THREADS_FRACTION = 0.75f;
-      
-      private final int maxSize;
-      private final int maxSingleShuffleLimit;
-      
-      private int size = 0;
-      
-      private Object dataAvailable = new Object();
-      private int fullSize = 0;
-      private int numPendingRequests = 0;
-      private int numRequiredMapOutputs = 0;
-      private int numClosed = 0;
-      private boolean closed = false;
-      
-      public ShuffleRamManager(Configuration conf) throws IOException {
-        final float maxInMemCopyUse =
-          conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f);
-        if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
-          throw new IOException("mapred.job.shuffle.input.buffer.percent" +
-                                maxInMemCopyUse);
-        }
-        // Allow unit tests to fix Runtime memory
-        maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes",
-            (int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
-          * maxInMemCopyUse);
-        maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
-        LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + 
-                 ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
-      }
-      
-      public synchronized boolean reserve(int requestedSize, InputStream in) 
-      throws InterruptedException {
-        // Wait till the request can be fulfilled...
-        while ((size + requestedSize) > maxSize) {
-          
-          // Close the input...
-          if (in != null) {
-            try {
-              in.close();
-            } catch (IOException ie) {
-              LOG.info("Failed to close connection with: " + ie);
-            } finally {
-              in = null;
-            }
-          } 
-
-          // Track pending requests
-          synchronized (dataAvailable) {
-            ++numPendingRequests;
-            dataAvailable.notify();
-          }
-
-          // Wait for memory to free up
-          wait();
-          
-          // Track pending requests
-          synchronized (dataAvailable) {
-            --numPendingRequests;
-          }
-        }
-        
-        size += requestedSize;
-        
-        return (in != null);
-      }
-      
-      public synchronized void unreserve(int requestedSize) {
-        size -= requestedSize;
-        
-        synchronized (dataAvailable) {
-          fullSize -= requestedSize;
-          --numClosed;
-        }
-        
-        // Notify the threads blocked on RamManager.reserve
-        notifyAll();
-      }
-      
-      private synchronized int getNumPendingRequests() {
-        return numPendingRequests;
-      }
-      
-      public boolean waitForDataToMerge() throws InterruptedException {
-        boolean done = false;
-        synchronized (dataAvailable) {
-                 // Start in-memory merge if manager has been closed or...
-          while (!closed
-                 &&
-                 // In-memory threshold exceeded and at least two segments
-                 // have been fetched
-                 (getPercentUsed() < maxInMemCopyPer || numClosed < 2)
-                 &&
-                 // More than "mapred.inmem.merge.threshold" map outputs
-                 // have been fetched into memory
-                 (maxInMemOutputs <= 0 || numClosed < maxInMemOutputs)
-                 && 
-                 // More than MAX... threads are blocked on the RamManager
-                 // or the blocked threads are the last map outputs to be
-                 // fetched. If numRequiredMapOutputs is zero, either
-                 // setNumCopiedMapOutputs has not been called (no map ouputs
-                 // have been fetched, so there is nothing to merge) or the
-                 // last map outputs being transferred without
-                 // contention, so a merge would be premature.
-                 (getNumPendingRequests() < 
-                      numCopiers*MAX_STALLED_SHUFFLE_THREADS_FRACTION && 
-                  (0 == numRequiredMapOutputs ||
-                   getNumPendingRequests() < numRequiredMapOutputs))) {
-            dataAvailable.wait();
-          }
-          done = closed;
-        }
-        return done;
-      }
-      
-      public void closeInMemoryFile(int requestedSize) {
-        synchronized (dataAvailable) {
-          fullSize += requestedSize;
-          ++numClosed;
-          dataAvailable.notify();
-        }
-      }
-      
-      public void setNumCopiedMapOutputs(int numRequiredMapOutputs) {
-        synchronized (dataAvailable) {
-          this.numRequiredMapOutputs = numRequiredMapOutputs;
-          dataAvailable.notify();
-        }
-      }
-      
-      public void close() {
-        synchronized (dataAvailable) {
-          closed = true;
-          LOG.info("Closed ram manager");
-          dataAvailable.notify();
-        }
-      }
-      
-      private float getPercentUsed() {
-        return (float)fullSize/maxSize;
-      }
-
-      int getMemoryLimit() {
-        return maxSize;
-      }
-      
-      boolean canFitInMemory(long requestedSize) {
-        return (requestedSize < Integer.MAX_VALUE && 
-                requestedSize < maxSingleShuffleLimit);
-      }
-    }
-
-    /** Copies map outputs as they become available */
-    private class MapOutputCopier extends Thread {
-      // basic/unit connection timeout (in milliseconds)
-      private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
-      // default read timeout (in milliseconds)
-      private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
-      private final int shuffleConnectionTimeout;
-      private final int shuffleReadTimeout;
-
-      private MapOutputLocation currentLocation = null;
-      private int id = nextMapOutputCopierId++;
-      private Reporter reporter;
-      private boolean readError = false;
-      
-      // Decompression of map-outputs
-      private CompressionCodec codec = null;
-      private Decompressor decompressor = null;
-      
-      public MapOutputCopier(JobConf job, Reporter reporter) {
-        setName("MapOutputCopier " + reduceTask.getTaskID() + "." + id);
-        LOG.debug(getName() + " created");
-        this.reporter = reporter;
-        
-        shuffleConnectionTimeout =
-          job.getInt("mapred.shuffle.connect.timeout", STALLED_COPY_TIMEOUT);
-        shuffleReadTimeout =
-          job.getInt("mapred.shuffle.read.timeout", DEFAULT_READ_TIMEOUT);
-        
-        if (job.getCompressMapOutput()) {
-          Class<? extends CompressionCodec> codecClass =
-            job.getMapOutputCompressorClass(DefaultCodec.class);
-          codec = ReflectionUtils.newInstance(codecClass, job);
-          decompressor = CodecPool.getDecompressor(codec);
-        }
-      }
-      
-      /**
-       * Fail the current file that we are fetching
-       * @return were we currently fetching?
-       */
-      public synchronized boolean fail() {
-        if (currentLocation != null) {
-          finish(-1, CopyOutputErrorType.OTHER_ERROR);
-          return true;
-        } else {
-          return false;
-        }
-      }
-      
-      /**
-       * Get the current map output location.
-       */
-      public synchronized MapOutputLocation getLocation() {
-        return currentLocation;
-      }
-      
-      private synchronized void start(MapOutputLocation loc) {
-        currentLocation = loc;
-      }
-      
-      private synchronized void finish(long size, CopyOutputErrorType error) {
-        if (currentLocation != null) {
-          LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
-          synchronized (copyResults) {
-            copyResults.add(new CopyResult(currentLocation, size, error));
-            copyResults.notify();
-          }
-          currentLocation = null;
-        }
-      }
-      
-      /** Loop forever and fetch map outputs as they become available.
-       * The thread exits when it is interrupted by {@link ReduceTaskRunner}
-       */
-      @Override
-      public void run() {
-        while (true) {        
-          try {
-            MapOutputLocation loc = null;
-            long size = -1;
-            
-            synchronized (scheduledCopies) {
-              while (scheduledCopies.isEmpty()) {
-                scheduledCopies.wait();
-              }
-              loc = scheduledCopies.remove(0);
-            }
-            CopyOutputErrorType error = CopyOutputErrorType.OTHER_ERROR;
-            readError = false;
-            try {
-              shuffleClientMetrics.threadBusy();
-              start(loc);
-              size = copyOutput(loc);
-              shuffleClientMetrics.successFetch();
-              error = CopyOutputErrorType.NO_ERROR;
-            } catch (IOException e) {
-              LOG.warn(reduceTask.getTaskID() + " copy failed: " +
-                       loc.getTaskAttemptId() + " from " + loc.getHost());
-              LOG.warn(StringUtils.stringifyException(e));
-              shuffleClientMetrics.failedFetch();
-              if (readError) {
-                error = CopyOutputErrorType.READ_ERROR;
-              }
-              // Reset 
-              size = -1;
-            } finally {
-              shuffleClientMetrics.threadFree();
-              finish(size, error);
-            }
-          } catch (InterruptedException e) { 
-            break; // ALL DONE
-          } catch (FSError e) {
-            LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " + 
-                      StringUtils.stringifyException(e));
-            try {
-              umbilical.fsError(reduceTask.getTaskID(), e.getMessage());
-            } catch (IOException io) {
-              LOG.error("Could not notify TT of FSError: " + 
-                      StringUtils.stringifyException(io));
-            }
-          } catch (Throwable th) {
-            String msg = getTaskID() + " : Map output copy failure : " 
-                         + StringUtils.stringifyException(th);
-            reportFatalError(getTaskID(), th, msg);
-          }
-        }
-        
-        if (decompressor != null) {
-          CodecPool.returnDecompressor(decompressor);
-        }
-          
-      }
-      
-      /** Copies a a map output from a remote host, via HTTP. 
-       * @param currentLocation the map output location to be copied
-       * @return the path (fully qualified) of the copied file
-       * @throws IOException if there is an error copying the file
-       * @throws InterruptedException if the copier should give up
-       */
-      private long copyOutput(MapOutputLocation loc
-                              ) throws IOException, InterruptedException {
-        // check if we still need to copy the output from this location
-        if (copiedMapOutputs.contains(loc.getTaskId()) || 
-            obsoleteMapIds.contains(loc.getTaskAttemptId())) {
-          return CopyResult.OBSOLETE;
-        } 
- 
-        // a temp filename. If this file gets created in ramfs, we're fine,
-        // else, we will check the localFS to find a suitable final location
-        // for this path
-        TaskAttemptID reduceId = reduceTask.getTaskID();
-        Path filename =
-            new Path(String.format(
-                MapOutputFile.REDUCE_INPUT_FILE_FORMAT_STRING,
-                TaskTracker.OUTPUT, loc.getTaskId().getId()));
-
-        // Copy the map output to a temp file whose name is unique to this attempt 
-        Path tmpMapOutput = new Path(filename+"-"+id);
-        
-        // Copy the map output
-        MapOutput mapOutput = getMapOutput(loc, tmpMapOutput,
-                                           reduceId.getTaskID().getId());
-        if (mapOutput == null) {
-          throw new IOException("Failed to fetch map-output for " + 
-                                loc.getTaskAttemptId() + " from " + 
-                                loc.getHost());
-        }
-        
-        // The size of the map-output
-        long bytes = mapOutput.compressedSize;
-        
-        // lock the ReduceTask while we do the rename
-        synchronized (ReduceTask.this) {
-          if (copiedMapOutputs.contains(loc.getTaskId())) {
-            mapOutput.discard();
-            return CopyResult.OBSOLETE;
-          }
-
-          // Special case: discard empty map-outputs
-          if (bytes == 0) {
-            try {
-              mapOutput.discard();
-            } catch (IOException ioe) {
-              LOG.info("Couldn't discard output of " + loc.getTaskId());
-            }
-            
-            // Note that we successfully copied the map-output
-            noteCopiedMapOutput(loc.getTaskId());
-            
-            return bytes;
-          }
-          
-          // Process map-output
-          if (mapOutput.inMemory) {
-            // Save it in the synchronized list of map-outputs
-            mapOutputsFilesInMemory.add(mapOutput);
-          } else {
-            // Rename the temporary file to the final file; 
-            // ensure it is on the same partition
-            tmpMapOutput = mapOutput.file;
-            filename = new Path(tmpMapOutput.getParent(), filename.getName());
-            if (!localFileSys.rename(tmpMapOutput, filename)) {
-              localFileSys.delete(tmpMapOutput, true);
-              throw new IOException("Failed to rename map output " + 
-                  tmpMapOutput + " to " + filename);
-            }
-
-            synchronized (mapOutputFilesOnDisk) {        
-              addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
-            }
-          }
-
-          // Note that we successfully copied the map-output
-          noteCopiedMapOutput(loc.getTaskId());
-        }
-        
-        return bytes;
-      }
-      
-      /**
-       * Save the map taskid whose output we just copied.
-       * This function assumes that it has been synchronized on ReduceTask.this.
-       * 
-       * @param taskId map taskid
-       */
-      private void noteCopiedMapOutput(TaskID taskId) {
-        copiedMapOutputs.add(taskId);
-        ramManager.setNumCopiedMapOutputs(numMaps - copiedMapOutputs.size());
-      }
-
-      /**
-       * Get the map output into a local file (either in the inmemory fs or on the 
-       * local fs) from the remote server.
-       * We use the file system so that we generate checksum files on the data.
-       * @param mapOutputLoc map-output to be fetched
-       * @param filename the filename to write the data into
-       * @param connectionTimeout number of milliseconds for connection timeout
-       * @param readTimeout number of milliseconds for read timeout
-       * @return the path of the file that got created
-       * @throws IOException when something goes wrong
-       */
-      private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, 
-                                     Path filename, int reduce)
-      throws IOException, InterruptedException {
-        // Connect
-        URLConnection connection = 
-          mapOutputLoc.getOutputLocation().openConnection();
-        InputStream input = getInputStream(connection, shuffleConnectionTimeout,
-                                           shuffleReadTimeout); 
-
-        // Validate header from map output
-        TaskAttemptID mapId = null;
-        try {
-          mapId =
-            TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
-        } catch (IllegalArgumentException ia) {
-          LOG.warn("Invalid map id ", ia);
-          return null;
-        }
-        TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
-        if (!mapId.equals(expectedMapId)) {
-          LOG.warn("data from wrong map:" + mapId +
-              " arrived to reduce task " + reduce +
-              ", where as expected map output should be from " + expectedMapId);
-          return null;
-        }
-
-        long decompressedLength =
-          Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
-        long compressedLength =
-          Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
-
-        if (compressedLength < 0 || decompressedLength < 0) {
-          LOG.warn(getName() + " invalid lengths in map output header: id: " +
-              mapId + " compressed len: " + compressedLength +
-              ", decompressed len: " + decompressedLength);
-          return null;
-        }
-        int forReduce =
-          (int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));
-
-        if (forReduce != reduce) {
-          LOG.warn("data for the wrong reduce: " + forReduce +
-              " with compressed len: " + compressedLength +
-              ", decompressed len: " + decompressedLength +
-              " arrived to reduce task " + reduce);
-          return null;
-        }
-        LOG.info("header: " + mapId + ", compressed len: " + compressedLength +
-            ", decompressed len: " + decompressedLength);
-
-        //We will put a file in memory if it meets certain criteria:
-        //1. The size of the (decompressed) file should be less than 25% of 
-        //    the total inmem fs
-        //2. There is space available in the inmem fs
-
-        // Check if this map-output can be saved in-memory
-        boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); 
-
-        // Shuffle
-        MapOutput mapOutput = null;
-        if (shuffleInMemory) { 
-          LOG.info("Shuffling " + decompressedLength + " bytes (" + 
-              compressedLength + " raw bytes) " + 
-              "into RAM from " + mapOutputLoc.getTaskAttemptId());
-
-          mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
-                                      (int)decompressedLength,
-                                      (int)compressedLength);
-        } else {
-          LOG.info("Shuffling " + decompressedLength + " bytes (" + 
-              compressedLength + " raw bytes) " + 
-              "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
-
-          mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
-              compressedLength);
-        }
-            
-        return mapOutput;
-      }
-
-      /** 
-       * The connection establishment is attempted multiple times and is given up 
-       * only on the last failure. Instead of connecting with a timeout of 
-       * X, we try connecting with a timeout of x < X but multiple times. 
-       */
-      private InputStream getInputStream(URLConnection connection, 
-                                         int connectionTimeout, 
-                                         int readTimeout) 
-      throws IOException {
-        int unit = 0;
-        if (connectionTimeout < 0) {
-          throw new IOException("Invalid timeout "
-                                + "[timeout = " + connectionTimeout + " ms]");
-        } else if (connectionTimeout > 0) {
-          unit = (UNIT_CONNECT_TIMEOUT > connectionTimeout)
-                 ? connectionTimeout
-                 : UNIT_CONNECT_TIMEOUT;
-        }
-        // set the read timeout to the total timeout
-        connection.setReadTimeout(readTimeout);
-        // set the connect timeout to the unit-connect-timeout
-        connection.setConnectTimeout(unit);
-        while (true) {
-          try {
-            connection.connect();
-            break;
-          } catch (IOException ioe) {
-            // update the total remaining connect-timeout
-            connectionTimeout -= unit;
-
-            // throw an exception if we have waited for timeout amount of time
-            // note that the updated value if timeout is used here
-            if (connectionTimeout == 0) {
-              throw ioe;
-            }
-
-            // reset the connect timeout for the last try
-            if (connectionTimeout < unit) {
-              unit = connectionTimeout;
-              // reset the connect time out for the final connect
-              connection.setConnectTimeout(unit);
-            }
-          }
-        }
-        try {
-          return connection.getInputStream();
-        } catch (IOException ioe) {
-          readError = true;
-          throw ioe;
-        }
-      }
-
-      private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
-                                        URLConnection connection, 
-                                        InputStream input,
-                                        int mapOutputLength,
-                                        int compressedLength)
-      throws IOException, InterruptedException {
-        // Reserve ram for the map-output
-        boolean createdNow = ramManager.reserve(mapOutputLength, input);
-      
-        // Reconnect if we need to
-        if (!createdNow) {
-          // Reconnect
-          try {
-            connection = mapOutputLoc.getOutputLocation().openConnection();
-            input = getInputStream(connection, shuffleConnectionTimeout, 
-                                   shuffleReadTimeout);
-          } catch (IOException ioe) {
-            LOG.info("Failed reopen connection to fetch map-output from " + 
-                     mapOutputLoc.getHost());
-            
-            // Inform the ram-manager
-            ramManager.closeInMemoryFile(mapOutputLength);
-            ramManager.unreserve(mapOutputLength);
-            
-            throw ioe;
-          }
-        }
-
-        IFileInputStream checksumIn = 
-          new IFileInputStream(input,compressedLength);
-
-        input = checksumIn;       
-      
-        // Are map-outputs compressed?
-        if (codec != null) {
-          decompressor.reset();
-          input = codec.createInputStream(input, decompressor);
-        }
-      
-        // Copy map-output into an in-memory buffer
-        byte[] shuffleData = new byte[mapOutputLength];
-        MapOutput mapOutput = 
-          new MapOutput(mapOutputLoc.getTaskId(), 
-                        mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength);
-        
-        int bytesRead = 0;
-        try {
-          int n = input.read(shuffleData, 0, shuffleData.length);
-          while (n > 0) {
-            bytesRead += n;
-            shuffleClientMetrics.inputBytes(n);
-
-            // indicate we're making progress
-            reporter.progress();
-            n = input.read(shuffleData, bytesRead, 
-                           (shuffleData.length-bytesRead));
-          }
-
-          LOG.info("Read " + bytesRead + " bytes from map-output for " +
-                   mapOutputLoc.getTaskAttemptId());
-
-          input.close();
-        } catch (IOException ioe) {
-          LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), 
-                   ioe);
-
-          // Inform the ram-manager
-          ramManager.closeInMemoryFile(mapOutputLength);
-          ramManager.unreserve(mapOutputLength);
-          
-          // Discard the map-output
-          try {
-            mapOutput.discard();
-          } catch (IOException ignored) {
-            LOG.info("Failed to discard map-output from " + 
-                     mapOutputLoc.getTaskAttemptId(), ignored);
-          }
-          mapOutput = null;
-          
-          // Close the streams
-          IOUtils.cleanup(LOG, input);
-
-          // Re-throw
-          readError = true;
-          throw ioe;
-        }
-
-        // Close the in-memory file
-        ramManager.closeInMemoryFile(mapOutputLength);
-
-        // Sanity check
-        if (bytesRead != mapOutputLength) {
-          // Inform the ram-manager
-          ramManager.unreserve(mapOutputLength);
-          
-          // Discard the map-output
-          try {
-            mapOutput.discard();
-          } catch (IOException ignored) {
-            // IGNORED because we are cleaning up
-            LOG.info("Failed to discard map-output from " + 
-                     mapOutputLoc.getTaskAttemptId(), ignored);
-          }
-          mapOutput = null;
-
-          throw new IOException("Incomplete map output received for " +
-                                mapOutputLoc.getTaskAttemptId() + " from " +
-                                mapOutputLoc.getOutputLocation() + " (" + 
-                                bytesRead + " instead of " + 
-                                mapOutputLength + ")"
-          );
-        }
-
-        // TODO: Remove this after a 'fix' for HADOOP-3647
-        if (mapOutputLength > 0) {
-          DataInputBuffer dib = new DataInputBuffer();
-          dib.reset(shuffleData, 0, shuffleData.length);
-          LOG.info("Rec #1 from " + mapOutputLoc.getTaskAttemptId() + " -> (" + 
-                   WritableUtils.readVInt(dib) + ", " + 
-                   WritableUtils.readVInt(dib) + ") from " + 
-                   mapOutputLoc.getHost());
-        }
-        
-        return mapOutput;
-      }
-      
-      private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
-                                      InputStream input,
-                                      Path filename,
-                                      long mapOutputLength) 
-      throws IOException {
-        // Find out a suitable location for the output on local-filesystem
-        Path localFilename = 
-          lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), 
-                                         mapOutputLength, conf);
-
-        MapOutput mapOutput = 
-          new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), 
-                        conf, localFileSys.makeQualified(localFilename), 
-                        mapOutputLength);
-
-
-        // Copy data to local-disk
-        OutputStream output = null;
-        long bytesRead = 0;
-        try {
-          output = rfs.create(localFilename);
-          
-          byte[] buf = new byte[64 * 1024];
-          int n = -1;
-          try {
-            n = input.read(buf, 0, buf.length);
-          } catch (IOException ioe) {
-            readError = true;
-            throw ioe;
-          }
-          while (n > 0) {
-            bytesRead += n;
-            shuffleClientMetrics.inputBytes(n);
-            output.write(buf, 0, n);
-
-            // indicate we're making progress
-            reporter.progress();
-            try {
-              n = input.read(buf, 0, buf.length);
-            } catch (IOException ioe) {
-              readError = true;
-              throw ioe;
-            }
-          }
-
-          LOG.info("Read " + bytesRead + " bytes from map-output for " +
-              mapOutputLoc.getTaskAttemptId());
-
-          output.close();
-          input.close();
-        } catch (IOException ioe) {
-          LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), 
-                   ioe);
-
-          // Discard the map-output
-          try {
-            mapOutput.discard();
-          } catch (IOException ignored) {
-            LOG.info("Failed to discard map-output from " + 
-                mapOutputLoc.getTaskAttemptId(), ignored);
-          }
-          mapOutput = null;
-
-          // Close the streams
-          IOUtils.cleanup(LOG, input, output);
-
-          // Re-throw
-          throw ioe;
-        }
-
-        // Sanity check
-        if (bytesRead != mapOutputLength) {
-          try {
-            mapOutput.discard();
-          } catch (Exception ioe) {
-            // IGNORED because we are cleaning up
-            LOG.info("Failed to discard map-output from " + 
-                mapOutputLoc.getTaskAttemptId(), ioe);
-          } catch (Throwable t) {
-            String msg = getTaskID() + " : Failed in shuffle to disk :" 
-                         + StringUtils.stringifyException(t);
-            reportFatalError(getTaskID(), t, msg);
-          }
-          mapOutput = null;
-
-          throw new IOException("Incomplete map output received for " +
-                                mapOutputLoc.getTaskAttemptId() + " from " +
-                                mapOutputLoc.getOutputLocation() + " (" + 
-                                bytesRead + " instead of " + 
-                                mapOutputLength + ")"
-          );
-        }
-
-        return mapOutput;
-
-      }
-      
-    } // MapOutputCopier
-    
-    private void configureClasspath(JobConf conf)
-      throws IOException {
-      
-      // get the task and the current classloader which will become the parent
-      Task task = ReduceTask.this;
-      ClassLoader parent = conf.getClassLoader();   
-      
-      // get the work directory which holds the elements we are dynamically
-      // adding to the classpath
-      File workDir = new File(task.getJobFile()).getParentFile();
-      ArrayList<URL> urllist = new ArrayList<URL>();
-      
-      // add the jars and directories to the classpath
-      String jar = conf.getJar();
-      if (jar != null) {      
-        File jobCacheDir = new File(new Path(jar).getParent().toString());
-
-        File[] libs = new File(jobCacheDir, "lib").listFiles();
-        if (libs != null) {
-          for (int i = 0; i < libs.length; i++) {
-            urllist.add(libs[i].toURL());
-          }
-        }
-        urllist.add(new File(jobCacheDir, "classes").toURL());
-        urllist.add(jobCacheDir.toURL());
-        
-      }
-      urllist.add(workDir.toURL());
-      
-      // create a new classloader with the old classloader as its parent
-      // then set that classloader as the one used by the current jobconf
-      URL[] urls = urllist.toArray(new URL[urllist.size()]);
-      URLClassLoader loader = new URLClassLoader(urls, parent);
-      conf.setClassLoader(loader);
-    }
-    
-    public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
-                        TaskReporter reporter
-                        )throws ClassNotFoundException, IOException {
-      
-      configureClasspath(conf);
-      this.reporter = reporter;
-      this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
-      this.umbilical = umbilical;      
-      this.reduceTask = ReduceTask.this;
-
-      this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
-      this.copyResults = new ArrayList<CopyResult>(100);    
-      this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
-      this.maxInFlight = 4 * numCopiers;
-      this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
-      Counters.Counter combineInputCounter = 
-        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
-      this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
-                                                  combineInputCounter,
-                                                  reporter, null);
-      if (combinerRunner != null) {
-        combineCollector = 
-          new CombineOutputCollector(reduceCombineOutputCounter);
-      }
-      
-      this.ioSortFactor = conf.getInt("io.sort.factor", 10);
-      // the exponential backoff formula
-      //    backoff (t) = init * base^(t-1)
-      // so for max retries we get
-      //    backoff(1) + .... + backoff(max_fetch_retries) ~ max
-      // solving which we get
-      //    max_fetch_retries ~ log((max * (base - 1) / init) + 1) / log(base)
-      // for the default value of max = 300 (5min) we get max_fetch_retries = 6
-      // the order is 4,8,16,32,64,128. sum of which is 252 sec = 4.2 min
-      
-      // optimizing for the base 2
-      this.maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP, 
-             getClosestPowerOf2((this.maxBackoff * 1000 / BACKOFF_INIT) + 1));
-      this.maxFailedUniqueFetches = Math.min(numMaps, 
-                                             this.maxFailedUniqueFetches);
-      this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
-      this.maxInMemCopyPer =
-        conf.getFloat("mapred.job.shuffle.merge.percent", 0.66f);
-      final float maxRedPer =
-        conf.getFloat("mapred.job.reduce.input.buffer.percent", 0f);
-      if (maxRedPer > 1.0 || maxRedPer < 0.0) {
-        throw new IOException("mapred.job.reduce.input.buffer.percent" +
-                              maxRedPer);
-      }
-      this.maxInMemReduce = (int)Math.min(
-          Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
-
-      // Setup the RamManager
-      ramManager = new ShuffleRamManager(conf);
-
-      localFileSys = FileSystem.getLocal(conf);
-
-      rfs = ((LocalFileSystem)localFileSys).getRaw();
-
-      // hosts -> next contact time
-      this.penaltyBox = new LinkedHashMap<String, Long>();
-      
-      // hostnames
-      this.uniqueHosts = new HashSet<String>();
-      
-      // Seed the random number generator with a reasonably globally unique seed
-      long randomSeed = System.nanoTime() + 
-                        (long)Math.pow(this.reduceTask.getPartition(),
-                                       (this.reduceTask.getPartition()%10)
-                                      );
-      this.random = new Random(randomSeed);
-      this.maxMapRuntime = 0;
-    }
-    
-    private boolean busyEnough(int numInFlight) {
-      return numInFlight > maxInFlight;
-    }
-    
-    
-    public boolean fetchOutputs() throws IOException {
-      int totalFailures = 0;
-      int            numInFlight = 0, numCopied = 0;
-      DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
-      final Progress copyPhase = 
-        reduceTask.getProgress().phase();
-      LocalFSMerger localFSMergerThread = null;
-      InMemFSMergeThread inMemFSMergeThread = null;
-      GetMapEventsThread getMapEventsThread = null;
-      
-      copyPhase.addPhases(numMaps); // add sub-phase per file
-      
-      copiers = new ArrayList<MapOutputCopier>(numCopiers);
-      
-      // start all the copying threads
-      for (int i=0; i < numCopiers; i++) {
-        MapOutputCopier copier = new MapOutputCopier(conf, reporter);
-        copiers.add(copier);
-        copier.start();
-      }
-      
-      //start the on-disk-merge thread
-      localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
-      //start the in memory merger thread
-      inMemFSMergeThread = new InMemFSMergeThread();
-      localFSMergerThread.start();
-      inMemFSMergeThread.start();
-      
-      // start the map events thread
-      getMapEventsThread = new GetMapEventsThread();
-      getMapEventsThread.start();
-      
-      // start the clock for bandwidth measurement
-      long startTime = System.currentTimeMillis();
-      long currentTime = startTime;
-      long lastProgressTime = startTime;
-      long lastOutputTime = 0;
-      
-        // loop until we get all required outputs
-        while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
-          
-          currentTime = System.currentTimeMillis();
-          boolean logNow = false;
-          if (currentTime - lastOutputTime > MIN_LOG_TIME) {
-            lastOutputTime = currentTime;
-            logNow = true;
-          }
-          if (logNow) {
-            LOG.info(reduceTask.getTaskID() + " Need another " 
-                   + (numMaps - copiedMapOutputs.size()) + " map output(s) "
-                   + "where " + numInFlight + " is already in progress");
-          }
-
-          // Put the hash entries for the failed fetches.
-          Iterator<MapOutputLocation> locItr = retryFetches.iterator();
-
-          while (locItr.hasNext()) {
-            MapOutputLocation loc = locItr.next(); 
-            List<MapOutputLocation> locList = 
-              mapLocations.get(loc.getHost());
-            
-            // Check if the list exists. Map output location mapping is cleared 
-            // once the jobtracker restarts and is rebuilt from scratch.
-            // Note that map-output-location mapping will be recreated and hence
-            // we continue with the hope that we might find some locations
-            // from the rebuild map.
-            if (locList != null) {
-              // Add to the beginning of the list so that this map is 
-              //tried again before the others and we can hasten the 
-              //re-execution of this map should there be a problem
-              locList.add(0, loc);
-            }
-          }
-
-          if (retryFetches.size() > 0) {
-            LOG.info(reduceTask.getTaskID() + ": " +  
-                  "Got " + retryFetches.size() +
-                  " map-outputs from previous failures");
-          }
-          // clear the "failed" fetches hashmap
-          retryFetches.clear();
-
-          // now walk through the cache and schedule what we can
-          int numScheduled = 0;
-          int numDups = 0;
-          
-          synchronized (scheduledCopies) {
-  
-            // Randomize the map output locations to prevent 
-            // all reduce-tasks swamping the same tasktracker
-            List<String> hostList = new ArrayList<String>();
-            hostList.addAll(mapLocations.keySet()); 
-            
-            Collections.shuffle(hostList, this.random);
-              
-            Iterator<String> hostsItr = hostList.iterator();
-
-            while (hostsItr.hasNext()) {
-            
-              String host = hostsItr.next();
-
-              List<MapOutputLocation> knownOutputsByLoc = 
-                mapLocations.get(host);
-
-              // Check if the list exists. Map output location mapping is 
-              // cleared once the jobtracker restarts and is rebuilt from 
-              // scratch.
-              // Note that map-output-location mapping will be recreated and 
-              // hence we continue with the hope that we might find some 
-              // locations from the rebuild map and add then for fetching.
-              if (knownOutputsByLoc == null || knownOutputsByLoc.size() == 0) {
-                continue;
-              }
-              
-              //Identify duplicate hosts here
-              if (uniqueHosts.contains(host)) {
-                 numDups += knownOutputsByLoc.size(); 
-                 continue;
-              }
-
-              Long penaltyEnd = penaltyBox.get(host);
-              boolean penalized = false;
-            
-              if (penaltyEnd != null) {
-                if (currentTime < penaltyEnd.longValue()) {
-                  penalized = true;
-                } else {
-                  penaltyBox.remove(host);
-                }
-              }
-              
-              if (penalized)
-                continue;
-
-              synchronized (knownOutputsByLoc) {
-              
-                locItr = knownOutputsByLoc.iterator();
-            
-                while (locItr.hasNext()) {
-              
-                  MapOutputLocation loc = locItr.next();
-              
-                  // Do not schedule fetches from OBSOLETE maps
-                  if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
-                    locItr.remove();
-                    continue;
-                  }
-
-                  uniqueHosts.add(host);
-                  scheduledCopies.add(loc);
-                  locItr.remove();  // remove from knownOutputs
-                  numInFlight++; numScheduled++;
-
-                  break; //we have a map from this host
-                }
-              }
-            }
-            scheduledCopies.notifyAll();
-          }
-
-          if (numScheduled > 0 || logNow) {
-            LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
-                   " outputs (" + penaltyBox.size() +
-                   " slow hosts and" + numDups + " dup hosts)");
-          }
-
-          if (penaltyBox.size() > 0 && logNow) {
-            LOG.info("Penalized(slow) Hosts: ");
-            for (String host : penaltyBox.keySet()) {
-              LOG.info(host + " Will be considered after: " + 
-                  ((penaltyBox.get(host) - currentTime)/1000) + " seconds.");
-            }
-          }
-
-          // if we have no copies in flight and we can't schedule anything
-          // new, just wait for a bit
-          try {
-            if (numInFlight == 0 && numScheduled == 0) {
-              // we should indicate progress as we don't want TT to think
-              // we're stuck and kill us
-              reporter.progress();
-              Thread.sleep(5000);
-            }
-          } catch (InterruptedException e) { } // IGNORE
-          
-          while (numInFlight > 0 && mergeThrowable == null) {
-            LOG.debug(reduceTask.getTaskID() + " numInFlight = " + 
-                      numInFlight);
-            //the call to getCopyResult will either 
-            //1) return immediately with a null or a valid CopyResult object,
-            //                 or
-            //2) if the numInFlight is above maxInFlight, return with a 
-            //   CopyResult object after getting a notification from a 
-            //   fetcher thread, 
-            //So, when getCopyResult returns null, we can be sure that
-            //we aren't busy enough and we should go and get more mapcompletion
-            //events from the tasktracker
-            CopyResult cr = getCopyResult(numInFlight);
-
-            if (cr == null) {
-              break;
-            }
-            
-            if (cr.getSuccess()) {  // a successful copy
-              numCopied++;
-              lastProgressTime = System.currentTimeMillis();
-              reduceShuffleBytes.increment(cr.getSize());
-                
-              long secsSinceStart = 
-                (System.currentTimeMillis()-startTime)/1000+1;
-              float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
-              float transferRate = mbs/secsSinceStart;
-                
-              copyPhase.startNextPhase();
-              copyPhase.setStatus("copy (" + numCopied + " of " + numMaps 
-                                  + " at " +
-                                  mbpsFormat.format(transferRate) +  " MB/s)");
-                
-              // Note successful fetch for this mapId to invalidate
-              // (possibly) old fetch-failures
-              fetchFailedMaps.remove(cr.getLocation().getTaskId());
-            } else if (cr.isObsolete()) {
-              //ignore
-              LOG.info(reduceTask.getTaskID() + 
-                       " Ignoring obsolete copy result for Map Task: " + 
-                       cr.getLocation().getTaskAttemptId() + " from host: " + 
-                       cr.getHost());
-            } else {
-              retryFetches.add(cr.getLocation());
-              
-              // note the failed-fetch
-              TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
-              TaskID mapId = cr.getLocation().getTaskId();
-              
-              totalFailures++;
-              Integer noFailedFetches = 
-                mapTaskToFailedFetchesMap.get(mapTaskId);
-              noFailedFetches = 
-                (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
-              mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
-              LOG.info("Task " + getTaskID() + ": Failed fetch #" + 
-                       noFailedFetches + " from " + mapTaskId);
-
-              // half the number of max fetch retries per map during 
-              // the end of shuffle
-              int fetchRetriesPerMap = maxFetchRetriesPerMap;
-              int pendingCopies = numMaps - numCopied;
-              
-              // The check noFailedFetches != maxFetchRetriesPerMap is
-              // required to make sure of the notification in case of a
-              // corner case : 
-              // when noFailedFetches reached maxFetchRetriesPerMap and 
-              // reducer reached the end of shuffle, then we may miss sending
-              // a notification if the difference between 
-              // noFailedFetches and fetchRetriesPerMap is not divisible by 2 
-              if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT &&
-                  noFailedFetches != maxFetchRetriesPerMap) {
-                fetchRetriesPerMap = fetchRetriesPerMap >> 1;
-              }
-              
-              // did the fetch fail too many times?
-              // using a hybrid technique for notifying the jobtracker.
-              //   a. the first notification is sent after max-retries 
-              //   b. subsequent notifications are sent after 2 retries.   
-              //   c. send notification immediately if it is a read error.   
-              if (cr.getError().equals(CopyOutputErrorType.READ_ERROR) ||
-                 ((noFailedFetches >= fetchRetriesPerMap) 
-                  && ((noFailedFetches - fetchRetriesPerMap) % 2) == 0)) {
-                synchronized (ReduceTask.this) {
-                  taskStatus.addFetchFailedMap(mapTaskId);
-                  reporter.progress();
-                  LOG.info("Failed to fetch map-output from " + mapTaskId + 
-                           " even after MAX_FETCH_RETRIES_PER_MAP retries... "
-                           + " or it is a read error, "
-                           + " reporting to the JobTracker");
-                }
-              }
-              // note unique failed-fetch maps
-              if (noFailedFetches == maxFetchRetriesPerMap) {
-                fetchFailedMaps.add(mapId);
-                  
-                // did we have too many unique failed-fetch maps?
-                // and did we fail on too many fetch attempts?
-                // and did we progress enough
-                //     or did we wait for too long without any progress?
-               
-                // check if the reducer is healthy
-                boolean reducerHealthy = 
-                    (((float)totalFailures / (totalFailures + numCopied)) 
-                     < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
-                
-                // check if the reducer has progressed enough
-                boolean reducerProgressedEnough = 
-                    (((float)numCopied / numMaps) 
-                     >= MIN_REQUIRED_PROGRESS_PERCENT);
-                
-                // check if the reducer is stalled for a long time
-                // duration for which the reducer is stalled
-                int stallDuration = 
-                    (int)(System.currentTimeMillis() - lastProgressTime);
-                // duration for which the reducer ran with progress
-                int shuffleProgressDuration = 
-                    (int)(lastProgressTime - startTime);
-                // min time the reducer should run without getting killed
-                int minShuffleRunDuration = 
-                    (shuffleProgressDuration > maxMapRuntime) 
-                    ? shuffleProgressDuration 
-                    : maxMapRuntime;
-                boolean reducerStalled = 
-                    (((float)stallDuration / minShuffleRunDuration) 
-                     >= MAX_ALLOWED_STALL_TIME_PERCENT);
-                
-                // kill if not healthy and has insufficient progress
-                if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
-                     fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size()))
-                    && !reducerHealthy 
-                    && (!reducerProgressedEnough || reducerStalled)) { 
-                  LOG.fatal("Shuffle failed with too many fetch failures " + 
-                            "and insufficient progress!" +
-                            "Killing task " + getTaskID() + ".");
-                  umbilical.shuffleError(getTaskID(), 
-                                         "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
-                                         + " bailing-out.");
-                }
-              }
-                
-              // back off exponentially until num_retries <= max_retries
-              // back off by max_backoff/2 on subsequent failed attempts
-              currentTime = System.currentTimeMillis();
-              int currentBackOff = noFailedFetches <= fetchRetriesPerMap 
-                                   ? BACKOFF_INIT 
-                                     * (1 << (noFailedFetches - 1)) 
-                                   : (this.maxBackoff * 1000 / 2);
-              // If it is read error,
-              //    back off for maxMapRuntime/2
-              //    during end of shuffle, 
-              //      backoff for min(maxMapRuntime/2, currentBackOff) 
-              if (cr.getError().equals(CopyOutputErrorType.READ_ERROR)) {
-                int backOff = maxMapRuntime >> 1;
-                if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT) {
-                  backOff = Math.min(backOff, currentBackOff); 
-                } 
-                currentBackOff = backOff;
-              }
-
-              penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
-              LOG.warn(reduceTask.getTaskID() + " adding host " +
-                       cr.getHost() + " to penalty box, next contact in " +
-                       (currentBackOff/1000) + " seconds");
-            }
-            uniqueHosts.remove(cr.getHost());
-            numInFlight--;
-          }
-        }
-        
-        // all done, inform the copiers to exit
-        exitGetMapEvents= true;
-        try {
-          getMapEventsThread.join();
-          LOG.info("getMapsEventsThread joined.");
-        } catch (InterruptedException ie) {
-          LOG.info("getMapsEventsThread threw an exception: " +
-              StringUtils.stringifyException(ie));
-        }
-
-        synchronized (copiers) {
-          synchronized (scheduledCopies) {
-            for (MapOutputCopier copier : copiers) {
-              copier.interrupt();
-            }
-            copiers.clear();
-          }
-        }
-        
-        // copiers are done, exit and notify the waiting merge threads
-        synchronized (mapOutputFilesOnDisk) {
-          exitLocalFSMerge = true;
-          mapOutputFilesOnDisk.notify();
-        }
-        
-        ramManager.close();
-        
-        //Do a merge of in-memory files (if there are any)
-        if (mergeThrowable == null) {
-          try {
-            // Wait for the on-disk merge to complete
-            localFSMergerThread.join();
-            LOG.info("Interleaved on-disk merge complete: " + 
-                     mapOutputFilesOnDisk.size() + " files left.");
-            
-            //wait for an ongoing merge (if it is in flight) to complete
-            inMemFSMergeThread.join();
-            LOG.info("In-memory merge complete: " + 
-                     mapOutputsFilesInMemory.size() + " files left.");
-            } catch (InterruptedException ie) {
-            LOG.warn(reduceTask.getTaskID() +
-                     " Final merge of the inmemory files threw an exception: " + 
-                     StringUtils.stringifyException(ie));
-            // check if the last merge generated an error
-            if (mergeThrowable != null) {
-              mergeThrowable = ie;
-            }
-            return false;
-          }
-        }
-        return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
-    }
-    
-    private long createInMemorySegments(
-        List<Segment<K, V>> inMemorySegments, long leaveBytes)
-        throws IOException {
-      long totalSize = 0L;
-      synchronized (mapOutputsFilesInMemory) {
-        // fullSize could come from the RamManager, but files can be
-        // closed but not yet present in mapOutputsFilesInMemory
-        long fullSize = 0L;
-        for (MapOutput mo : mapOutputsFilesInMemory) {
-          fullSize += mo.data.length;
-        }
-        while(fullSize > leaveBytes) {
-          MapOutput mo = mapOutputsFilesInMemory.remove(0);
-          totalSize += mo.data.length;
-          fullSize -= mo.data.length;
-          Reader<K, V> reader = 
-            new InMemoryReader<K, V>(ramManager, mo.mapAttemptId,
-                                     mo.data, 0, mo.data.length);
-          Segment<K, V> segment = 
-            new Segment<K, V>(reader, true);
-          inMemorySegments.add(segment);
-        }
-      }
-      return totalSize;
-    }
-
-    /**
-     * Create a RawKeyValueIterator from copied map outputs. All copying
-     * threads have exited, so all of the map outputs are available either in
-     * memory or on disk. We also know that no merges are in progress, so
-     * synchronization is more lax, here.
-     *
-     * The iterator returned must satisfy the following constraints:
-     *   1. Fewer than io.sort.factor files may be sources
-     *   2. No more than maxInMemReduce bytes of map outputs may be resident
-     *      in memory when the reduce begins
-     *
-     * If we must perform an intermediate merge to satisfy (1), then we can
-     * keep the excluded outputs from (2) in memory and include them in the
-     * first merge pass. If not, then said outputs must be written to disk
-     * first.
-     */
-    @SuppressWarnings("unchecked")
-    private RawKeyValueIterator createKVIterator(
-        JobConf job, FileSystem fs, Reporter reporter) throws IOException {
-
-      // merge config params
-      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
-      Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
-      boolean keepInputs = job.getKeepFailedTaskFiles();
-      final Path tmpDir = new Path(getTaskID().toString());
-      final RawComparator<K> comparator =
-        (RawComparator<K>)job.getOutputKeyComparator();
-
-      // segments required to vacate memory
-      List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
-      long inMemToDiskBytes = 0;
-      // sortPhaseFinished will be set to true if we call merge() separately
-      // here to vacate memory(i.e. there will not be any intermediate merges.
-      // In other words, only final merge is pending).
-      boolean sortPhaseFinished = false;
-      if (mapOutputsFilesInMemory.size() > 0) {
-        TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
-        inMemToDiskBytes = createInMemorySegments(memDiskSegments,
-            maxInMemReduce);
-        final int numMemDiskSegments = memDiskSegments.size();
-        if (numMemDiskSegments > 0 &&
-              ioSortFactor > mapOutputFilesOnDisk.size()) {
-          // As we have < ioSortFactor files on disk now, after this
-          // merging of inMemory segments, we would have at most ioSortFactor
-          // files on disk. So only final merge(directly feeding to reducers)
-          // will be pending. i.e. reduce phase will be pending.
-          sortPhaseFinished = true;
-          
-          // must spill to disk, but can't retain in-mem for intermediate merge
-          final Path outputPath =
-              mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
-          final RawKeyValueIterator rIter = Merger.merge(job, fs,
-              keyClass, valueClass, memDiskSegments, numMemDiskSegments,
-              tmpDir, comparator, reporter, spilledRecordsCounter, null,
-              sortPhase);
-
-          final Writer writer = new Writer(job, fs, outputPath,
-              keyClass, valueClass, codec, null);
-          try {
-            Merger.writeFile(rIter, writer, reporter, job);
-            addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
-          } catch (Exception e) {
-            if (null != outputPath) {
-              fs.delete(outputPath, true);
-            }
-            throw new IOException("Final merge failed", e);
-          } finally {
-            if (null != writer) {
-              writer.close();
-            }
-          }
-          LOG.info("Merged " + numMemDiskSegments + " segments, " +
-                   inMemToDiskBytes + " bytes to disk to satisfy " +
-                   "reduce memory limit");
-          inMemToDiskBytes = 0;
-          memDiskSegments.clear();
-        } else if (inMemToDiskBytes != 0) {
-          LOG.info("Keeping " + numMemDiskSegments + " segments, " +
-                   inMemToDiskBytes + " bytes in memory for " +
-                   "intermediate, on-disk merge");
-        }
-      }
-
-      // segments on disk
-      List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
-      long onDiskBytes = inMemToDiskBytes;
-      Path[] onDisk = getMapFiles(fs, false);
-      for (Path file : onDisk) {
-        onDiskBytes += fs.getFileStatus(file).getLen();
-        diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs));
-      }
-      LOG.info("Merging " + onDisk.length + " files, " +
-               onDiskBytes + " bytes from disk");
-      Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
-        public int compare(Segment<K, V> o1, Segment<K, V> o2) {
-          if (o1.getLength() == o2.getLength()) {
-            return 0;
-          }
-          return o1.getLength() < o2.getLength() ? -1 : 1;
-        }
-      });
-
-      // build final list of segments from merged backed by disk + in-mem
-      List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
-      long inMemBytes = createInMemorySegments(finalSegments, 0);
-      LOG.info("Merging " + finalSegments.size() + " segments, " +
-               inMemBytes + " bytes from memory into reduce");
-      if (0 != onDiskBytes) {
-        final int numInMemSegments = memDiskSegments.size();
-        diskSegments.addAll(0, memDiskSegments);
-        memDiskSegments.clear();
-        Progress mergePhase = (sortPhaseFinished) ? null : sortPhase; 
-        RawKeyValueIterator diskMerge = Merger.merge(
-            job, fs, keyClass, valueClass, codec, diskSegments,
-            ioSortFactor, 0 == numInMemSegments ? 0 : numInMemSegments - 1,
-            tmpDir, comparator, reporter, false, spilledRecordsCounter, null,
-            mergePhase);
-        diskSegments.clear();
-        if (0 == finalSegments.size()) {
-          return diskMerge;
-        }
-        finalSegments.add(new Segment<K,V>(
-              new RawKVIteratorReader(diskMerge, onDiskBytes), true));
-      }
-      return Merger.merge(job, fs, keyClass, valueClass,
-                   finalSegments, finalSegments.size(), tmpDir,
-                   comparator, reporter, spilledRecordsCounter, null, null);
-    }
-
-    class RawKVIteratorReader extends IFile.Reader<K,V> {
-
-      private final RawKeyValueIterator kvIter;
-
-      public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
-          throws IOException {
-        super(null, null, size, null, spilledRecordsCounter);
-        this.kvIter = kvIter;
-      }
-      public boolean nextRawKey(DataInputBuffer key) throws IOException {
-        if (kvIter.next()) {
-          final DataInputBuffer kb = kvIter.getKey();
-          final int kp = kb.getPosition();
-          final int klen = kb.getLength() - kp;
-          key.reset(kb.getData(), kp, klen);
-          bytesRead += klen;
-          return true;
-        }
-        return false;
-      }
-      public void nextRawValue(DataInputBuffer value) throws IOException {
-        final DataInputBuffer vb = kvIter.getValue();
-        final int vp = vb.getPosition();
-        final int vlen = vb.getLength() - vp;
-        value.reset(vb.getData(), vp, vlen);
-        bytesRead += vlen;
-      }
-      public long getPosition() throws IOException {
-        return bytesRead;
-      }
-
-      public void close() throws IOException {
-        kvIter.close();
-      }
-    }
-
-    private CopyResult getCopyResult(int numInFlight) {  
-      synchronized (copyResults) {
-        while (copyResults.isEmpty()) {
-          try {
-            //The idea is that if we have scheduled enough, we can wait until
-            //we hear from one of the copiers.
-            if (busyEnough(numInFlight)) {
-              copyResults.wait();
-            } else {
-              return null;
-            }
-          } catch (InterruptedException e) { }
-        }
-        return copyResults.remove(0);
-      }    
-    }
-    
-    private void addToMapOutputFilesOnDisk(FileStatus status) {
-      synchronized (mapOutputFilesOnDisk) {
-        mapOutputFilesOnDisk.add(status);
-        mapOutputFilesOnDisk.notify();
-      }
-    }
-    
-    
-    
-    /** Starts merging the local copy (on disk) of the map's output so that
-     * most of the reducer's input is sorted i.e overlapping shuffle
-     * and merge phases.
-     */
-    private class LocalFSMerger extends Thread {
-      private LocalFileSystem localFileSys;
-
-      public LocalFSMerger(LocalFileSystem fs) {
-        this.localFileSys = fs;
-        setName("Thread for merging on-disk files");
-        setDaemon(true);
-      }
-
-      @SuppressWarnings("unchecked")
-      public void run() {
-        try {
-          LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
-          while(!exitLocalFSMerge){
-            synchronized (mapOutputFilesOnDisk) {
-              while (!exitLocalFSMerge &&
-                  mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
-                LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName());
-                mapOutputFilesOnDisk.wait();
-              }
-            }
-            if(exitLocalFSMerge) {//to avoid running one extra time in the end
-              break;
-            }
-            List<Path> mapFiles = new ArrayList<Path>();
-            long approxOutputSize = 0;
-            int bytesPerSum = 
-              reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
-            LOG.info(reduceTask.getTaskID() + "We have  " + 
-                mapOutputFilesOnDisk.size() + " map outputs on disk. " +
-                "Triggering merge of " + ioSortFactor + " files");
-            // 1. Prepare the list of files to be merged. This list is prepared
-            // using a list of map output files on disk. Currently we merge
-            // io.sort.factor files into 1.
-            synchronized (mapOutputFilesOnDisk) {
-              for (int i = 0; i < ioSortFactor; ++i) {
-                FileStatus filestatus = mapOutputFilesOnDisk.first();
-                mapOutputFilesOnDisk.remove(filestatus);
-                mapFiles.add(filestatus.getPath());
-                approxOutputSize += filestatus.getLen();
-              }
-            }
-            
-            // sanity check
-            if (mapFiles.size() == 0) {
-                return;
-            }
-            
-            // add the checksum length
-            approxOutputSize += ChecksumFileSystem
-                                .getChecksumLength(approxOutputSize,
-                                                   bytesPerSum);
-  
-            // 2. Start the on-disk merge process
-            Path outputPath = 
-              lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(), 
-                                             approxOutputSize, conf)
-              .suffix(".merged");
-            Writer writer = 
-              new Writer(conf,rfs, outputPath, 
-                         conf.getMapOutputKeyClass(), 
-                         conf.getMapOutputValueClass(),
-                         codec, null);
-            RawKeyValueIterator iter  = null;
-            Path tmpDir = new Path(reduceTask.getTaskID().toString());
-            try {
-              iter = Merger.merge(conf, rfs,
-                                  conf.getMapOutputKeyClass(),
-                                  conf.getMapOutputValueClass(),
-                                  codec, mapFiles.toArray(new Path[mapFiles.size()]), 
-                                  true, ioSortFactor, tmpDir, 
-                                  conf.getOutputKeyComparator(), reporter,
-                                  spilledRecordsCounter, null, null);
-              
-              Merger.writeFile(iter, writer, reporter, conf);
-              writer.close();
-            } catch (Exception e) {
-              localFileSys.delete(outputPath, true);
-              throw new IOException (StringUtils.stringifyException(e));
-            }
-            
-            synchronized (mapOutputFilesOnDisk) {
-              addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
-            }
-            
-            LOG.info(reduceTask.getTaskID() +
-                     " Finished merging " + mapFiles.size() + 
-                     " map output files on disk of total-size " + 
-                     approxOutputSize + "." + 
-                     " Local output file is " + outputPath + " of size " +
-                     localFileSys.getFileStatus(outputPath).getLen());
-            }
-        } catch (Exception e) {
-          LOG.warn(reduceTask.getTaskID()
-                   + " Merging of the local FS files threw an exception: "
-                   + StringUtils.stringifyException(e));
-          if (mergeThrowable == null) {
-            mergeThrowable = e;
-          }
-        } catch (Throwable t) {
-          String msg = getTaskID() + " : Failed to merge on the local FS" 
-                       + StringUtils.stringifyException(t);
-          reportFatalError(getTaskID(), t, msg);
-        }
-      }
-    }
-
-    private class InMemFSMergeThread extends Thread {
-      
-      public InMemFSMergeThread() {
-        setName("Thread for merging in memory files");
-        setDaemon(true);
-      }
-      
-      public void run() {
-        LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
-        try {
-          boolean exit = false;
-          do {
-            exit = ramManager.waitForDataToMerge();
-            if (!exit) {
-              doInMemMerge();
-            }
-          } while (!exit);
-        } catch (Exception e) {
-          LOG.warn(reduceTask.getTaskID() +
-                   " Merge of the inmemory files threw an exception: "
-                   + StringUtils.stringifyException(e));
-          ReduceCopier.this.mergeThrowable = e;
-        } catch (Throwable t) {
-          String msg = getTaskID() + " : Failed to merge in memory" 
-                       + StringUtils.stringifyException(t);
-          reportFatalError(getTaskID(), t, msg);
-        }
-      }
-      
-      @SuppressWarnings("unchecked")
-      private void doInMemMerge() throws IOException{
-        if (mapOutputsFilesInMemory.size() == 0) {
-          return;
-        }
-        
-        //name this output file same as the name of the first file that is 
-        //there in the current list of inmem files (this is guaranteed to
-        //be absent on the disk currently. So we don't overwrite a prev. 
-        //created spill). Also we need to create the output file now since
-        //it is not guaranteed that this file will be present after merge
-        //is called (we delete empty files as soon as we see them
-        //in the merge method)
-
-        //figure out the mapId 
-        TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
-
-        List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K,V>>();
-        long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
-        int noInMemorySegments = inMemorySegments.size();
-
-        Path outputPath =
-            mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
-
-        Writer writer = 
-          new Writer(conf, rfs, outputPath,
-                     conf.getMapOutputKeyClass(),
-                     conf.getMapOutputValueClass(),
-                     codec, null);
-
-        RawKeyValueIterator rIter = null;
-        try {
-          LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
-                   " segments...");
-          
-          rIter = Merger.merge(conf, rfs,

[... 182 lines stripped ...]


Mime
View raw message