hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r810944 [4/4] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/task/ src/java/org/apache/hadoop/mapreduce/task/reduce/...
Date Thu, 03 Sep 2009 13:56:22 GMT
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,764 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapred.Merger;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapred.Merger.Segment;
+import org.apache.hadoop.mapred.Task.CombineOutputCollector;
+import org.apache.hadoop.mapred.Task.CombineValuesIterator;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+
+@SuppressWarnings(value={"unchecked", "deprecation"})
+
+public class MergeManager<K, V> {
+  
+  private static final Log LOG = LogFactory.getLog(MergeManager.class);
+  
+  /* Maximum percentage of the in-memory limit that a single shuffle can 
+   * consume*/ 
+  private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;
+
+  private final TaskAttemptID reduceId;
+  
+  private final JobConf jobConf;
+  private final FileSystem localFS;
+  private final FileSystem rfs;
+  private final LocalDirAllocator localDirAllocator;
+  
+  protected MapOutputFile mapOutputFile;
+  
+  Set<MapOutput<K, V>> inMemoryMergedMapOutputs = 
+    new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
+  private final IntermediateMemoryToMemoryMerger memToMemMerger;
+
+  Set<MapOutput<K, V>> inMemoryMapOutputs = 
+    new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
+  private final InMemoryMerger inMemoryMerger;
+  
+  Set<Path> onDiskMapOutputs = new TreeSet<Path>();
+  private final OnDiskMerger onDiskMerger;
+  
+  private final int memoryLimit;
+  private int usedMemory;
+  private final int maxSingleShuffleLimit;
+  
+  private final int memToMemMergeOutputsThreshold; 
+  private final int mergeThreshold;
+  
+  private final int ioSortFactor;
+
+  private final Reporter reporter;
+  private final ExceptionReporter exceptionReporter;
+  
+  /**
+   * Combiner class to run during in-memory merge, if defined.
+   */
+  private final Class<? extends Reducer> combinerClass;
+
+  /**
+   * Resettable collector used for combine.
+   */
+  private final CombineOutputCollector<K,V> combineCollector;
+
+  private final Counters.Counter spilledRecordsCounter;
+
+  private final Counters.Counter reduceCombineInputCounter;
+
+  private final Counters.Counter mergedMapOutputsCounter;
+  
+  private final CompressionCodec codec;
+  
+  private final Progress mergePhase;
+
+  public MergeManager(TaskAttemptID reduceId, JobConf jobConf, 
+                      FileSystem localFS,
+                      LocalDirAllocator localDirAllocator,  
+                      Reporter reporter,
+                      CompressionCodec codec,
+                      Class<? extends Reducer> combinerClass,
+                      CombineOutputCollector<K,V> combineCollector,
+                      Counters.Counter spilledRecordsCounter,
+                      Counters.Counter reduceCombineInputCounter,
+                      Counters.Counter mergedMapOutputsCounter,
+                      ExceptionReporter exceptionReporter,
+                      Progress mergePhase) {
+    this.reduceId = reduceId;
+    this.jobConf = jobConf;
+    this.localDirAllocator = localDirAllocator;
+    this.exceptionReporter = exceptionReporter;
+    
+    this.reporter = reporter;
+    this.codec = codec;
+    this.combinerClass = combinerClass;
+    this.combineCollector = combineCollector;
+    this.reduceCombineInputCounter = reduceCombineInputCounter;
+    this.spilledRecordsCounter = spilledRecordsCounter;
+    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+    this.mapOutputFile = new MapOutputFile();
+    this.mapOutputFile.setConf(jobConf);
+    
+    this.localFS = localFS;
+    this.rfs = ((LocalFileSystem)localFS).getRaw();
+    
+    final float maxInMemCopyUse =
+      jobConf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.90f);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for " +
+          "mapred.job.shuffle.input.buffer.percent: " +
+          maxInMemCopyUse);
+    }
+
+    // Allow unit tests to fix Runtime memory
+    this.memoryLimit = 
+      (int)(jobConf.getInt("mapred.job.reduce.total.mem.bytes",
+          (int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
+        * maxInMemCopyUse);
+ 
+    this.ioSortFactor = jobConf.getInt("io.sort.factor", 100);
+
+    this.maxSingleShuffleLimit = 
+      (int)(memoryLimit * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
+    this.memToMemMergeOutputsThreshold = 
+            jobConf.getInt("mapred.memtomem.merge.threshold", ioSortFactor);
+    this.mergeThreshold = (int)(this.memoryLimit * 
+                          jobConf.getFloat("mapred.job.shuffle.merge.percent", 
+                                           0.90f));
+    LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
+             "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
+             "mergeThreshold=" + mergeThreshold + ", " + 
+             "ioSortFactor=" + ioSortFactor + ", " +
+             "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
+
+    boolean allowMemToMemMerge = 
+      jobConf.getBoolean("mapred.job.shuffle.allow.memtomem.merge", false);
+    if (allowMemToMemMerge) {
+      this.memToMemMerger = 
+        new IntermediateMemoryToMemoryMerger(this,
+                                             memToMemMergeOutputsThreshold);
+      this.memToMemMerger.start();
+    } else {
+      this.memToMemMerger = null;
+    }
+    
+    this.inMemoryMerger = new InMemoryMerger(this);
+    this.inMemoryMerger.start();
+    
+    this.onDiskMerger = new OnDiskMerger(this);
+    this.onDiskMerger.start();
+    
+    this.mergePhase = mergePhase;
+  }
+  
+
+  TaskAttemptID getReduceId() {
+    return reduceId;
+  }
+
+  public void waitForInMemoryMerge() throws InterruptedException {
+    inMemoryMerger.waitForMerge();
+  }
+  
+  private boolean canShuffleToMemory(long requestedSize) {
+    return (requestedSize < maxSingleShuffleLimit); 
+  }
+  
+  final private MapOutput<K,V> stallShuffle = new MapOutput<K,V>(null);
+
+  public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, 
+                                             long requestedSize,
+                                             int fetcher
+                                             ) throws IOException {
+    if (!canShuffleToMemory(requestedSize)) {
+      LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
+               " is greater than maxSingleShuffleLimit (" + 
+               maxSingleShuffleLimit + ")");
+      return new MapOutput<K,V>(mapId, this, requestedSize, jobConf, 
+                                localDirAllocator, fetcher, true);
+    }
+    
+    // Stall shuffle if we are above the memory limit
+
+    // It is possible that all threads could just be stalling and not make
+    // progress at all. This could happen when:
+    //
+    // requested size is causing the used memory to go above limit &&
+    // requested size < singleShuffleLimit &&
+    // current used size < mergeThreshold (merge will not get triggered)
+    //
+    // To avoid this from happening, we allow exactly one thread to go past
+    // the memory limit. We check (usedMemory > memoryLimit) and not
+    // (usedMemory + requestedSize > memoryLimit). When this thread is done
+    // fetching, this will automatically trigger a merge thereby unlocking
+    // all the stalled threads
+    
+    if (usedMemory > memoryLimit) {
+      LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory + 
+               ") is greater than memoryLimit (" + memoryLimit + ")"); 
+      
+      return stallShuffle;
+    }
+    
+    // Allow the in-memory shuffle to progress
+    LOG.debug(mapId + ": Proceeding with shuffle since usedMemory (" +
+        usedMemory + 
+        ") is lesser than memoryLimit (" + memoryLimit + ")"); 
+    return unconditionalReserve(mapId, requestedSize, true);
+  }
+  
+  /**
+   * Unconditional Reserve is used by the Memory-to-Memory thread
+   * @return
+   */
+  private synchronized MapOutput<K, V> unconditionalReserve(
+      TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
+    usedMemory += requestedSize;
+    return new MapOutput<K,V>(mapId, this, (int)requestedSize, 
+        primaryMapOutput);
+  }
+  
+  synchronized void unreserve(long size) {
+    usedMemory -= size;
+  }
+  
+  public synchronized void closeInMemoryFile(MapOutput<K,V> mapOutput) { 
+    inMemoryMapOutputs.add(mapOutput);
+    LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
+        + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size());
+    
+    synchronized (inMemoryMerger) {
+      if (!inMemoryMerger.isInProgress() && usedMemory >= mergeThreshold) {
+        LOG.info("Starting inMemoryMerger's merge since usedMemory=" +
+            usedMemory + " > mergeThreshold=" + mergeThreshold);
+        inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
+        inMemoryMergedMapOutputs.clear();
+        inMemoryMerger.startMerge(inMemoryMapOutputs);
+      } 
+    }
+    
+    if (memToMemMerger != null) {
+      synchronized (memToMemMerger) {
+        if (!memToMemMerger.isInProgress() && 
+            inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
+          memToMemMerger.startMerge(inMemoryMapOutputs);
+        }
+      }
+    }
+  }
+  
+  
+  public synchronized void closeInMemoryMergedFile(MapOutput<K,V> mapOutput) {
+    inMemoryMergedMapOutputs.add(mapOutput);
+    LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + 
+             ", inMemoryMergedMapOutputs.size() -> " + 
+             inMemoryMergedMapOutputs.size());
+  }
+  
+  public synchronized void closeOnDiskFile(Path file) {
+    onDiskMapOutputs.add(file);
+    
+    synchronized (onDiskMerger) {
+      if (!onDiskMerger.isInProgress() && 
+          onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
+        onDiskMerger.startMerge(onDiskMapOutputs);
+      }
+    }
+  }
+  
+  public RawKeyValueIterator close() throws Throwable {
+    // Wait for on-going merges to complete
+    if (memToMemMerger != null) { 
+      memToMemMerger.close();
+    }
+    inMemoryMerger.close();
+    onDiskMerger.close();
+    
+    List<MapOutput<K, V>> memory = 
+      new ArrayList<MapOutput<K, V>>(inMemoryMergedMapOutputs);
+    memory.addAll(inMemoryMapOutputs);
+    List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
+    return finalMerge(jobConf, rfs, memory, disk);
+  }
+   
+  private class IntermediateMemoryToMemoryMerger 
+  extends MergeThread<MapOutput<K, V>, K, V> {
+    
+    public IntermediateMemoryToMemoryMerger(MergeManager<K, V> manager, 
+                                            int mergeFactor) {
+      super(manager, mergeFactor, exceptionReporter);
+      setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
+      		    "shuffled map-outputs");
+      setDaemon(true);
+    }
+
+    @Override
+    public void merge(List<MapOutput<K, V>> inputs) throws IOException {
+      if (inputs == null || inputs.size() == 0) {
+        return;
+      }
+
+      TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
+      List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
+      long mergeOutputSize = 
+        createInMemorySegments(inputs, inMemorySegments, 0);
+      int noInMemorySegments = inMemorySegments.size();
+      
+      MapOutput<K, V> mergedMapOutputs = 
+        unconditionalReserve(dummyMapId, mergeOutputSize, false);
+      
+      Writer<K, V> writer = 
+        new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
+      
+      LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
+               " segments of total-size: " + mergeOutputSize);
+
+      RawKeyValueIterator rIter = 
+        Merger.merge(jobConf, rfs,
+                     (Class<K>)jobConf.getMapOutputKeyClass(),
+                     (Class<V>)jobConf.getMapOutputValueClass(),
+                     inMemorySegments, inMemorySegments.size(),
+                     new Path(reduceId.toString()),
+                     (RawComparator<K>)jobConf.getOutputKeyComparator(),
+                     reporter, null, null, null);
+      Merger.writeFile(rIter, writer, reporter, jobConf);
+      writer.close();
+
+      LOG.info(reduceId +  
+               " Memory-to-Memory merge of the " + noInMemorySegments +
+               " files in-memory complete.");
+
+      // Note the output of the merge
+      closeInMemoryMergedFile(mergedMapOutputs);
+    }
+  }
+  
+  private class InMemoryMerger extends MergeThread<MapOutput<K,V>, K,V> {
+    
+    public InMemoryMerger(MergeManager<K, V> manager) {
+      super(manager, Integer.MAX_VALUE, exceptionReporter);
+      setName
+      ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
+      setDaemon(true);
+    }
+    
+    @Override
+    public void merge(List<MapOutput<K,V>> inputs) throws IOException {
+      if (inputs == null || inputs.size() == 0) {
+        return;
+      }
+      
+      //name this output file same as the name of the first file that is 
+      //there in the current list of inmem files (this is guaranteed to
+      //be absent on the disk currently. So we don't overwrite a prev. 
+      //created spill). Also we need to create the output file now since
+      //it is not guaranteed that this file will be present after merge
+      //is called (we delete empty files as soon as we see them
+      //in the merge method)
+
+      //figure out the mapId 
+      TaskAttemptID mapId = inputs.get(0).getMapId();
+      TaskID mapTaskId = mapId.getTaskID();
+
+      List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
+      long mergeOutputSize = 
+        createInMemorySegments(inputs, inMemorySegments,0);
+      int noInMemorySegments = inMemorySegments.size();
+
+      Path outputPath = 
+        mapOutputFile.getInputFileForWrite(mapTaskId,
+                                           mergeOutputSize).suffix(
+                                               Task.MERGED_OUTPUT_PREFIX);
+
+      Writer<K,V> writer = 
+        new Writer<K,V>(jobConf, rfs, outputPath,
+                        (Class<K>) jobConf.getMapOutputKeyClass(),
+                        (Class<V>) jobConf.getMapOutputValueClass(),
+                        codec, null);
+
+      RawKeyValueIterator rIter = null;
+      try {
+        LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
+                 " segments...");
+        
+        rIter = Merger.merge(jobConf, rfs,
+                             (Class<K>)jobConf.getMapOutputKeyClass(),
+                             (Class<V>)jobConf.getMapOutputValueClass(),
+                             inMemorySegments, inMemorySegments.size(),
+                             new Path(reduceId.toString()),
+                             (RawComparator<K>)jobConf.getOutputKeyComparator(),
+                             reporter, spilledRecordsCounter, null, null);
+        
+        if (null == combinerClass) {
+          Merger.writeFile(rIter, writer, reporter, jobConf);
+        } else {
+          combineCollector.setWriter(writer);
+          combineAndSpill(rIter, reduceCombineInputCounter);
+        }
+        writer.close();
+
+        LOG.info(reduceId +  
+            " Merge of the " + noInMemorySegments +
+            " files in-memory complete." +
+            " Local file is " + outputPath + " of size " + 
+            localFS.getFileStatus(outputPath).getLen());
+      } catch (IOException e) { 
+        //make sure that we delete the ondisk file that we created 
+        //earlier when we invoked cloneFileAttributes
+        localFS.delete(outputPath, true);
+        throw e;
+      }
+
+      // Note the output of the merge
+      closeOnDiskFile(outputPath);
+    }
+
+  }
+  
+  private class OnDiskMerger extends MergeThread<Path,K,V> {
+    
+    public OnDiskMerger(MergeManager<K, V> manager) {
+      super(manager, Integer.MAX_VALUE, exceptionReporter);
+      setName("OnDiskMerger - Thread to merge on-disk map-outputs");
+      setDaemon(true);
+    }
+    
+    @Override
+    public void merge(List<Path> inputs) throws IOException {
+      // sanity check
+      if (inputs == null || inputs.isEmpty()) {
+        LOG.info("No ondisk files to merge...");
+        return;
+      }
+      
+      long approxOutputSize = 0;
+      int bytesPerSum = 
+        jobConf.getInt("io.bytes.per.checksum", 512);
+      
+      LOG.info("OnDiskMerger: We have  " + inputs.size() + 
+               " map outputs on disk. Triggering merge...");
+      
+      // 1. Prepare the list of files to be merged. 
+      for (Path file : inputs) {
+        approxOutputSize += localFS.getFileStatus(file).getLen();
+      }
+
+      // add the checksum length
+      approxOutputSize += 
+        ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
+
+      // 2. Start the on-disk merge process
+      Path outputPath = 
+        localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
+            approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
+      Writer<K,V> writer = 
+        new Writer<K,V>(jobConf, rfs, outputPath, 
+                        (Class<K>) jobConf.getMapOutputKeyClass(), 
+                        (Class<V>) jobConf.getMapOutputValueClass(),
+                        codec, null);
+      RawKeyValueIterator iter  = null;
+      Path tmpDir = new Path(reduceId.toString());
+      try {
+        iter = Merger.merge(jobConf, rfs,
+                            (Class<K>) jobConf.getMapOutputKeyClass(),
+                            (Class<V>) jobConf.getMapOutputValueClass(),
+                            codec, inputs.toArray(new Path[inputs.size()]), 
+                            true, ioSortFactor, tmpDir, 
+                            (RawComparator<K>) jobConf.getOutputKeyComparator(), 
+                            reporter, spilledRecordsCounter, null, 
+                            mergedMapOutputsCounter, null);
+
+        Merger.writeFile(iter, writer, reporter, jobConf);
+        writer.close();
+      } catch (IOException e) {
+        localFS.delete(outputPath, true);
+        throw e;
+      }
+
+      closeOnDiskFile(outputPath);
+
+      LOG.info(reduceId +
+          " Finished merging " + inputs.size() + 
+          " map output files on disk of total-size " + 
+          approxOutputSize + "." + 
+          " Local output file is " + outputPath + " of size " +
+          localFS.getFileStatus(outputPath).getLen());
+    }
+  }
+  
+  private void combineAndSpill(
+      RawKeyValueIterator kvIter,
+      Counters.Counter inCounter) throws IOException {
+    JobConf job = jobConf;
+    Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
+    Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
+    Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
+    RawComparator<K> comparator = 
+      (RawComparator<K>)job.getOutputKeyComparator();
+    try {
+      CombineValuesIterator values = new CombineValuesIterator(
+          kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
+          inCounter);
+      while (values.more()) {
+        combiner.reduce(values.getKey(), values, combineCollector,
+                        Reporter.NULL);
+        values.nextKey();
+      }
+    } finally {
+      combiner.close();
+    }
+  }
+
+  private long createInMemorySegments(List<MapOutput<K,V>> inMemoryMapOutputs,
+                                      List<Segment<K, V>> inMemorySegments, 
+                                      long leaveBytes
+                                      ) throws IOException {
+    long totalSize = 0L;
+    // We could use fullSize could come from the RamManager, but files can be
+    // closed but not yet present in inMemoryMapOutputs
+    long fullSize = 0L;
+    for (MapOutput<K,V> mo : inMemoryMapOutputs) {
+      fullSize += mo.getMemory().length;
+    }
+    while(fullSize > leaveBytes) {
+      MapOutput<K,V> mo = inMemoryMapOutputs.remove(0);
+      byte[] data = mo.getMemory();
+      long size = data.length;
+      totalSize += size;
+      fullSize -= size;
+      Reader<K,V> reader = new InMemoryReader<K,V>(MergeManager.this, 
+                                                   mo.getMapId(),
+                                                   data, 0, (int)size);
+      inMemorySegments.add(new Segment<K,V>(reader, true, 
+                                            (mo.isPrimaryMapOutput() ? 
+                                            mergedMapOutputsCounter : null)));
+    }
+    return totalSize;
+  }
+
+  class RawKVIteratorReader extends IFile.Reader<K,V> {
+
+    private final RawKeyValueIterator kvIter;
+
+    public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
+        throws IOException {
+      super(null, null, size, null, spilledRecordsCounter);
+      this.kvIter = kvIter;
+    }
+    public boolean nextRawKey(DataInputBuffer key) throws IOException {
+      if (kvIter.next()) {
+        final DataInputBuffer kb = kvIter.getKey();
+        final int kp = kb.getPosition();
+        final int klen = kb.getLength() - kp;
+        key.reset(kb.getData(), kp, klen);
+        bytesRead += klen;
+        return true;
+      }
+      return false;
+    }
+    public void nextRawValue(DataInputBuffer value) throws IOException {
+      final DataInputBuffer vb = kvIter.getValue();
+      final int vp = vb.getPosition();
+      final int vlen = vb.getLength() - vp;
+      value.reset(vb.getData(), vp, vlen);
+      bytesRead += vlen;
+    }
+    public long getPosition() throws IOException {
+      return bytesRead;
+    }
+
+    public void close() throws IOException {
+      kvIter.close();
+    }
+  }
+
+  private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
+                                       List<MapOutput<K,V>> inMemoryMapOutputs,
+                                       List<Path> onDiskMapOutputs
+                                       ) throws IOException {
+    LOG.info("finalMerge called with " + 
+             inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
+             onDiskMapOutputs.size() + " on-disk map-outputs");
+    
+    final float maxRedPer =
+      job.getFloat("mapred.job.reduce.input.buffer.percent", 0f);
+    if (maxRedPer > 1.0 || maxRedPer < 0.0) {
+      throw new IOException("mapred.job.reduce.input.buffer.percent" +
+                            maxRedPer);
+    }
+    int maxInMemReduce = (int)Math.min(
+        Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
+    
+
+    // merge config params
+    Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+    Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
+    boolean keepInputs = job.getKeepFailedTaskFiles();
+    final Path tmpDir = new Path(reduceId.toString());
+    final RawComparator<K> comparator =
+      (RawComparator<K>)job.getOutputKeyComparator();
+
+    // segments required to vacate memory
+    List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
+    long inMemToDiskBytes = 0;
+    boolean mergePhaseFinished = false;
+    if (inMemoryMapOutputs.size() > 0) {
+      TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
+      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
+                                                memDiskSegments,
+                                                maxInMemReduce);
+      final int numMemDiskSegments = memDiskSegments.size();
+      if (numMemDiskSegments > 0 &&
+            ioSortFactor > onDiskMapOutputs.size()) {
+        
+        // If we reach here, it implies that we have less than io.sort.factor
+        // disk segments and this will be incremented by 1 (result of the 
+        // memory segments merge). Since this total would still be 
+        // <= io.sort.factor, we will not do any more intermediate merges,
+        // the merge of all these disk segments would be directly fed to the
+        // reduce method
+        
+        mergePhaseFinished = true;
+        // must spill to disk, but can't retain in-mem for intermediate merge
+        final Path outputPath = 
+          mapOutputFile.getInputFileForWrite(mapId,
+                                             inMemToDiskBytes).suffix(
+                                                 Task.MERGED_OUTPUT_PREFIX);
+        final RawKeyValueIterator rIter = Merger.merge(job, fs,
+            keyClass, valueClass, memDiskSegments, numMemDiskSegments,
+            tmpDir, comparator, reporter, spilledRecordsCounter, null, 
+            mergePhase);
+        final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
+            keyClass, valueClass, codec, null);
+        try {
+          Merger.writeFile(rIter, writer, reporter, job);
+          // add to list of final disk outputs.
+          onDiskMapOutputs.add(outputPath);
+        } catch (IOException e) {
+          if (null != outputPath) {
+            try {
+              fs.delete(outputPath, true);
+            } catch (IOException ie) {
+              // NOTHING
+            }
+          }
+          throw e;
+        } finally {
+          if (null != writer) {
+            writer.close();
+          }
+        }
+        LOG.info("Merged " + numMemDiskSegments + " segments, " +
+                 inMemToDiskBytes + " bytes to disk to satisfy " +
+                 "reduce memory limit");
+        inMemToDiskBytes = 0;
+        memDiskSegments.clear();
+      } else if (inMemToDiskBytes != 0) {
+        LOG.info("Keeping " + numMemDiskSegments + " segments, " +
+                 inMemToDiskBytes + " bytes in memory for " +
+                 "intermediate, on-disk merge");
+      }
+    }
+
+    // segments on disk
+    List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
+    long onDiskBytes = inMemToDiskBytes;
+    Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
+    for (Path file : onDisk) {
+      onDiskBytes += fs.getFileStatus(file).getLen();
+      LOG.debug("Disk file: " + file + " Length is " + 
+          fs.getFileStatus(file).getLen());
+      diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
+                                         (file.toString().endsWith(
+                                             Task.MERGED_OUTPUT_PREFIX) ?
+                                          null : mergedMapOutputsCounter)
+                                        ));
+    }
+    LOG.info("Merging " + onDisk.length + " files, " +
+             onDiskBytes + " bytes from disk");
+    Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
+      public int compare(Segment<K, V> o1, Segment<K, V> o2) {
+        if (o1.getLength() == o2.getLength()) {
+          return 0;
+        }
+        return o1.getLength() < o2.getLength() ? -1 : 1;
+      }
+    });
+
+    // build final list of segments from merged backed by disk + in-mem
+    List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
+    long inMemBytes = createInMemorySegments(inMemoryMapOutputs, 
+                                             finalSegments, 0);
+    LOG.info("Merging " + finalSegments.size() + " segments, " +
+             inMemBytes + " bytes from memory into reduce");
+    if (0 != onDiskBytes) {
+      final int numInMemSegments = memDiskSegments.size();
+      diskSegments.addAll(0, memDiskSegments);
+      memDiskSegments.clear();
+      // Pass mergePhase only if there is a going to be intermediate
+      // merges. See comment where mergePhaseFinished is being set
+      Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
+      RawKeyValueIterator diskMerge = Merger.merge(
+          job, fs, keyClass, valueClass, diskSegments,
+          ioSortFactor, numInMemSegments, tmpDir, comparator,
+          reporter, false, spilledRecordsCounter, null, thisPhase);
+      diskSegments.clear();
+      if (0 == finalSegments.size()) {
+        return diskMerge;
+      }
+      finalSegments.add(new Segment<K,V>(
+            new RawKVIteratorReader(diskMerge, onDiskBytes), true));
+    }
+    return Merger.merge(job, fs, keyClass, valueClass,
+                 finalSegments, finalSegments.size(), tmpDir,
+                 comparator, reporter, spilledRecordsCounter, null,
+                 null);
+  
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+abstract class MergeThread<T,K,V> extends Thread {
+  
+  private static final Log LOG = LogFactory.getLog(MergeThread.class);
+
+  private volatile boolean inProgress = false;
+  private List<T> inputs = new ArrayList<T>();
+  protected final MergeManager<K,V> manager;
+  private final ExceptionReporter reporter;
+  private boolean closed = false;
+  private final int mergeFactor;
+  
+  public MergeThread(MergeManager<K,V> manager, int mergeFactor,
+                     ExceptionReporter reporter) {
+    this.manager = manager;
+    this.mergeFactor = mergeFactor;
+    this.reporter = reporter;
+  }
+  
+  public synchronized void close() throws InterruptedException {
+    closed = true;
+    waitForMerge();
+    interrupt();
+  }
+
+  public synchronized boolean isInProgress() {
+    return inProgress;
+  }
+  
+  public synchronized void startMerge(Set<T> inputs) {
+    if (!closed) {
+      inProgress = true;
+      this.inputs = new ArrayList<T>();
+      Iterator<T> iter=inputs.iterator();
+      for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
+        this.inputs.add(iter.next());
+        iter.remove();
+      }
+      LOG.info(getName() + ": Starting merge with " + this.inputs.size() + 
+               " segments, while ignoring " + inputs.size() + " segments");
+      notifyAll();
+    }
+  }
+
+  public synchronized void waitForMerge() throws InterruptedException {
+    while (inProgress) {
+      wait();
+    }
+  }
+
+  public void run() {
+    while (true) {
+      try {
+        // Wait for notification to start the merge...
+        synchronized (this) {
+          while (!inProgress) {
+            wait();
+          }
+        }
+
+        // Merge
+        merge(inputs);
+      } catch (InterruptedException ie) {
+        return;
+      } catch(Throwable t) {
+        reporter.reportException(t);
+        return;
+      } finally {
+        synchronized (this) {
+          // Clear inputs
+          inputs = null;
+          inProgress = false;        
+          notifyAll();
+        }
+      }
+    }
+  }
+
+  public abstract void merge(List<T> inputs) throws IOException;
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapred.Task.CombineOutputCollector;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progress;
+
+public class Shuffle<K, V> implements ExceptionReporter {
+  private static final Log LOG = LogFactory.getLog(Shuffle.class);
+  private static final int PROGRESS_FREQUENCY = 2000;
+  
+  private final TaskAttemptID reduceId;
+  private final JobConf jobConf;
+  private final Reporter reporter;
+  private final ShuffleClientMetrics metrics;
+  private final TaskUmbilicalProtocol umbilical;
+  
+  private final ShuffleScheduler<K,V> scheduler;
+  private final MergeManager<K, V> merger;
+  private Throwable throwable = null;
+  private String throwingThreadName = null;
+  private final Progress copyPhase;
+  private final TaskStatus taskStatus;
+  private final Task reduceTask; //Used for status updates
+  
+  public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS,
+                 TaskUmbilicalProtocol umbilical,
+                 LocalDirAllocator localDirAllocator,  
+                 Reporter reporter,
+                 CompressionCodec codec,
+                 Class<? extends Reducer> combinerClass,
+                 CombineOutputCollector<K,V> combineCollector,
+                 Counters.Counter spilledRecordsCounter,
+                 Counters.Counter reduceCombineInputCounter,
+                 Counters.Counter shuffledMapsCounter,
+                 Counters.Counter reduceShuffleBytes,
+                 Counters.Counter failedShuffleCounter,
+                 Counters.Counter mergedMapOutputsCounter,
+                 TaskStatus status,
+                 Progress copyPhase,
+                 Progress mergePhase,
+                 Task reduceTask) {
+    this.reduceId = reduceId;
+    this.jobConf = jobConf;
+    this.umbilical = umbilical;
+    this.reporter = reporter;
+    this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
+    this.copyPhase = copyPhase;
+    this.taskStatus = status;
+    this.reduceTask = reduceTask;
+    
+    scheduler = 
+      new ShuffleScheduler<K,V>(jobConf, status, this, copyPhase, 
+                                shuffledMapsCounter, 
+                                reduceShuffleBytes, failedShuffleCounter);
+    merger = new MergeManager<K, V>(reduceId, jobConf, localFS, 
+                                    localDirAllocator, reporter, codec, 
+                                    combinerClass, combineCollector, 
+                                    spilledRecordsCounter, 
+                                    reduceCombineInputCounter, 
+                                    mergedMapOutputsCounter, 
+                                    this, mergePhase);
+  }
+  
+  @SuppressWarnings("unchecked")
+  public RawKeyValueIterator run() throws IOException, InterruptedException {
+    // Start the map-completion events fetcher thread
+    final EventFetcher<K,V> eventFetcher = 
+      new EventFetcher<K,V>(reduceId, umbilical, scheduler, this);
+    eventFetcher.start();
+    
+    // Start the map-output fetcher threads
+    final int numFetchers = jobConf.getInt("mapred.reduce.parallel.copies", 5);
+    Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
+    for (int i=0; i < numFetchers; ++i) {
+      fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
+                                     reporter, metrics, this);
+      fetchers[i].start();
+    }
+    
+    // Wait for shuffle to complete successfully
+    while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
+      reporter.progress();
+      
+      synchronized (this) {
+        if (throwable != null) {
+          throw new ShuffleError("error in shuffle in " + throwingThreadName,
+                                 throwable);
+        }
+      }
+    }
+
+    // Stop the event-fetcher thread
+    eventFetcher.interrupt();
+    try {
+      eventFetcher.join();
+    } catch(Throwable t) {
+      LOG.info("Failed to stop " + eventFetcher.getName(), t);
+    }
+    
+    // Stop the map-output fetcher threads
+    for (Fetcher<K,V> fetcher : fetchers) {
+      fetcher.interrupt();
+    }
+    for (Fetcher<K,V> fetcher : fetchers) {
+      fetcher.join();
+    }
+    fetchers = null;
+    
+    // stop the scheduler
+    scheduler.close();
+
+    copyPhase.complete(); // copy is already complete
+    taskStatus.setPhase(TaskStatus.Phase.SORT);
+    reduceTask.statusUpdate(umbilical);
+
+    // Finish the on-going merges...
+    RawKeyValueIterator kvIter = null;
+    try {
+      kvIter = merger.close();
+    } catch (Throwable e) {
+      throw new ShuffleError("Error while doing final merge " , e);
+    }
+
+    // Sanity check
+    synchronized (this) {
+      if (throwable != null) {
+        throw new ShuffleError("error in shuffle in " + throwingThreadName,
+                               throwable);
+      }
+    }
+    
+    return kvIter;
+  }
+
+  public synchronized void reportException(Throwable t) {
+    if (throwable == null) {
+      throwable = t;
+      throwingThreadName = Thread.currentThread().getName();
+      // Notify the scheduler so that the reporting thread finds the 
+      // exception immediately.
+      synchronized (scheduler) {
+        scheduler.notifyAll();
+      }
+    }
+  }
+  
+  public static class ShuffleError extends IOException {
+    private static final long serialVersionUID = 5753909320586607881L;
+
+    ShuffleError(String msg, Throwable t) {
+      super(msg, t);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+
+class ShuffleClientMetrics implements Updater {
+
+  private MetricsRecord shuffleMetrics = null;
+  private int numFailedFetches = 0;
+  private int numSuccessFetches = 0;
+  private long numBytes = 0;
+  private int numThreadsBusy = 0;
+  private final int numCopiers;
+  
+  ShuffleClientMetrics(TaskAttemptID reduceId, JobConf jobConf) {
+    this.numCopiers = jobConf.getInt("mapred.reduce.parallel.copies", 5);
+
+    MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+    this.shuffleMetrics = 
+      MetricsUtil.createRecord(metricsContext, "shuffleInput");
+    this.shuffleMetrics.setTag("user", jobConf.getUser());
+    this.shuffleMetrics.setTag("jobName", jobConf.getJobName());
+    this.shuffleMetrics.setTag("jobId", reduceId.getJobID().toString());
+    this.shuffleMetrics.setTag("taskId", reduceId.toString());
+    this.shuffleMetrics.setTag("sessionId", jobConf.getSessionId());
+    metricsContext.registerUpdater(this);
+  }
+  public synchronized void inputBytes(long numBytes) {
+    this.numBytes += numBytes;
+  }
+  public synchronized void failedFetch() {
+    ++numFailedFetches;
+  }
+  public synchronized void successFetch() {
+    ++numSuccessFetches;
+  }
+  public synchronized void threadBusy() {
+    ++numThreadsBusy;
+  }
+  public synchronized void threadFree() {
+    --numThreadsBusy;
+  }
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
+      shuffleMetrics.incrMetric("shuffle_failed_fetches", 
+                                numFailedFetches);
+      shuffleMetrics.incrMetric("shuffle_success_fetches", 
+                                numSuccessFetches);
+      if (numCopiers != 0) {
+        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
+            100*((float)numThreadsBusy/numCopiers));
+      } else {
+        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
+      }
+      numBytes = 0;
+      numSuccessFetches = 0;
+      numFailedFetches = 0;
+    }
+    shuffleMetrics.update();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Shuffle Header information that is sent by the TaskTracker and 
+ * deciphered by the Fetcher thread of Reduce task
+ *
+ */
+public class ShuffleHeader implements Writable {
+
+  /**
+   * The longest possible length of task attempt id that we will accept.
+   */
+  private static final int MAX_ID_LENGTH = 1000;
+
+  String mapId;
+  long uncompressedLength;
+  long compressedLength;
+  int forReduce;
+  
+  public ShuffleHeader() { }
+  
+  public ShuffleHeader(String mapId, long compressedLength,
+      long uncompressedLength, int forReduce) {
+    this.mapId = mapId;
+    this.compressedLength = compressedLength;
+    this.uncompressedLength = uncompressedLength;
+    this.forReduce = forReduce;
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    mapId = WritableUtils.readStringSafely(in, MAX_ID_LENGTH);
+    compressedLength = WritableUtils.readVLong(in);
+    uncompressedLength = WritableUtils.readVLong(in);
+    forReduce = WritableUtils.readVInt(in);
+  }
+
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, mapId);
+    WritableUtils.writeVLong(out, compressedLength);
+    WritableUtils.writeVLong(out, uncompressedLength);
+    WritableUtils.writeVInt(out, forReduce);
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=810944&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java Thu Sep  3 13:56:21 2009
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.reduce.MapHost.State;
+import org.apache.hadoop.util.Progress;
+
+class ShuffleScheduler<K,V> {
+  static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
+    protected Long initialValue() {
+      return 0L;
+    }
+  };
+
+  private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
+  private static final int MAX_MAPS_AT_ONCE = 20;
+  private static final long INITIAL_PENALTY = 10000;
+  private static final float PENALTY_GROWTH_RATE = 1.3f;
+  private final static int REPORT_FAILURE_LIMIT = 10;
+
+  private final boolean[] finishedMaps;
+  private final int totalMaps;
+  private int remainingMaps;
+  private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
+  private Set<MapHost> pendingHosts = new HashSet<MapHost>();
+  private Set<TaskAttemptID> obsoleteMaps = new HashSet<TaskAttemptID>();
+  
+  private final Random random = new Random(System.currentTimeMillis());
+  private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
+  private final Referee referee = new Referee();
+  private final Map<TaskAttemptID,IntWritable> failureCounts =
+    new HashMap<TaskAttemptID,IntWritable>();
+  private final Map<String,IntWritable> hostFailures = 
+    new HashMap<String,IntWritable>();
+  private final TaskStatus status;
+  private final ExceptionReporter reporter;
+  private final int abortFailureLimit;
+  private final Progress progress;
+  private final Counters.Counter shuffledMapsCounter;
+  private final Counters.Counter reduceShuffleBytes;
+  private final Counters.Counter failedShuffleCounter;
+  
+  private final long startTime;
+  private long lastProgressTime;
+  
+  private int maxMapRuntime = 0;
+  private int maxFailedUniqueFetches = 5;
+  
+  private long totalBytesShuffledTillNow = 0;
+  private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
+
+  
+  public ShuffleScheduler(JobConf job, TaskStatus status,
+                          ExceptionReporter reporter,
+                          Progress progress,
+                          Counters.Counter shuffledMapsCounter,
+                          Counters.Counter reduceShuffleBytes,
+                          Counters.Counter failedShuffleCounter) {
+    totalMaps = job.getNumMapTasks();
+    abortFailureLimit = Math.max(30, totalMaps / 10);
+    remainingMaps = totalMaps;
+    finishedMaps = new boolean[remainingMaps];
+    this.reporter = reporter;
+    this.status = status;
+    this.progress = progress;
+    this.shuffledMapsCounter = shuffledMapsCounter;
+    this.reduceShuffleBytes = reduceShuffleBytes;
+    this.failedShuffleCounter = failedShuffleCounter;
+    this.startTime = System.currentTimeMillis();
+    lastProgressTime = startTime;
+    referee.start();
+    this.maxFailedUniqueFetches = Math.min(totalMaps,
+        this.maxFailedUniqueFetches);
+  }
+
+  public synchronized void copySucceeded(TaskAttemptID mapId, 
+                                         MapHost host,
+                                         long bytes,
+                                         long millis,
+                                         MapOutput<K,V> output
+                                         ) throws IOException {
+    failureCounts.remove(mapId);
+    hostFailures.remove(host.getHostName());
+    int mapIndex = mapId.getTaskID().getId();
+    
+    if (!finishedMaps[mapIndex]) {
+      output.commit();
+      finishedMaps[mapIndex] = true;
+      shuffledMapsCounter.increment(1);
+      if (--remainingMaps == 0) {
+        notifyAll();
+      }
+
+      // update the status
+      totalBytesShuffledTillNow += bytes;
+      float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
+      int mapsDone = totalMaps - remainingMaps;
+      long secsSinceStart = 
+        (System.currentTimeMillis()-startTime)/1000+1;
+
+      float transferRate = mbs/secsSinceStart;
+      progress.set((float) mapsDone / totalMaps);
+      String statusString = mapsDone + " / " + totalMaps + " copied.";
+      status.setStateString(statusString);
+      progress.setStatus("copy(" + mapsDone + " of " + totalMaps 
+          + " at " +
+          mbpsFormat.format(transferRate) +  " MB/s)");
+      
+      reduceShuffleBytes.increment(bytes);
+      lastProgressTime = System.currentTimeMillis();
+      LOG.debug("map " + mapId + " done " + statusString);
+    }
+  }
+
+  public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
+                                      boolean readError) {
+    host.penalize();
+    int failures = 1;
+    if (failureCounts.containsKey(mapId)) {
+      IntWritable x = failureCounts.get(mapId);
+      x.set(x.get() + 1);
+      failures = x.get();
+    } else {
+      failureCounts.put(mapId, new IntWritable(1));      
+    }
+    String hostname = host.getHostName();
+    if (hostFailures.containsKey(hostname)) {
+      IntWritable x = hostFailures.get(hostname);
+      x.set(x.get() + 1);
+    } else {
+      hostFailures.put(hostname, new IntWritable(1));
+    }
+    if (failures >= abortFailureLimit) {
+      try {
+        throw new IOException(failures + " failures downloading " + mapId);
+      } catch (IOException ie) {
+        reporter.reportException(ie);
+      }
+    }
+    
+    // Notify the JobTracker after every 'reportFailureLimit' failures
+    checkAndInformJobTracker(failures, mapId, readError);
+
+    checkReducerHealth();
+    
+    long delay = (long) (INITIAL_PENALTY *
+        Math.pow(PENALTY_GROWTH_RATE, failures));
+    
+    penalties.add(new Penalty(host, delay));
+    
+    failedShuffleCounter.increment(1);
+  }
+  
+  private void checkAndInformJobTracker(
+      int failures, TaskAttemptID mapId, boolean readError) {
+    if (readError || ((failures % REPORT_FAILURE_LIMIT) == 0)) {
+      status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
+    }
+  }
+    
+  private void checkReducerHealth() {
+    final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
+    final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
+    final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
+
+    long totalFailures = failedShuffleCounter.getValue();
+    int doneMaps = totalMaps - remainingMaps;
+    
+    boolean reducerHealthy =
+      (((float)totalFailures / (totalFailures + doneMaps))
+          < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
+    
+    // check if the reducer has progressed enough
+    boolean reducerProgressedEnough =
+      (((float)doneMaps / totalMaps)
+          >= MIN_REQUIRED_PROGRESS_PERCENT);
+
+    // check if the reducer is stalled for a long time
+    // duration for which the reducer is stalled
+    int stallDuration =
+      (int)(System.currentTimeMillis() - lastProgressTime);
+    
+    // duration for which the reducer ran with progress
+    int shuffleProgressDuration =
+      (int)(lastProgressTime - startTime);
+
+    // min time the reducer should run without getting killed
+    int minShuffleRunDuration =
+      (shuffleProgressDuration > maxMapRuntime)
+      ? shuffleProgressDuration
+          : maxMapRuntime;
+    
+    boolean reducerStalled =
+      (((float)stallDuration / minShuffleRunDuration)
+          >= MAX_ALLOWED_STALL_TIME_PERCENT);
+
+    // kill if not healthy and has insufficient progress
+    if ((failureCounts.size() >= maxFailedUniqueFetches ||
+        failureCounts.size() == (totalMaps - doneMaps))
+        && !reducerHealthy
+        && (!reducerProgressedEnough || reducerStalled)) {
+      LOG.fatal("Shuffle failed with too many fetch failures " +
+      "and insufficient progress!");
+      String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
+      reporter.reportException(new IOException(errorMsg));
+    }
+
+  }
+  
+  public synchronized void tipFailed(TaskID taskId) {
+    finishedMaps[taskId.getId()] = true;
+  }
+  
+  public synchronized void addKnownMapOutput(String hostName, 
+                                             String hostUrl,
+                                             TaskAttemptID mapId) {
+    MapHost host = mapLocations.get(hostName);
+    if (host == null) {
+      host = new MapHost(hostName, hostUrl);
+      mapLocations.put(hostName, host);
+    }
+    host.addKnownMap(mapId);
+
+    // Mark the host as pending 
+    if (host.getState() == State.PENDING) {
+      pendingHosts.add(host);
+      notifyAll();
+    }
+  }
+  
+  public synchronized void obsoleteMapOutput(TaskAttemptID mapId) {
+    obsoleteMaps.add(mapId);
+  }
+  
+  public synchronized void putBackKnownMapOutput(MapHost host, 
+                                                 TaskAttemptID mapId) {
+    host.addKnownMap(mapId);
+  }
+
+  public synchronized MapHost getHost() throws InterruptedException {
+      while(pendingHosts.isEmpty()) {
+        wait();
+      }
+      
+      MapHost host = null;
+      Iterator<MapHost> iter = pendingHosts.iterator();
+      int numToPick = random.nextInt(pendingHosts.size());
+      for (int i=0; i <= numToPick; ++i) {
+        host = iter.next();
+      }
+      
+      pendingHosts.remove(host);     
+      host.markBusy();
+      
+      LOG.info("Assiging " + host + " with " + host.getNumKnownMapOutputs() + 
+               " to " + Thread.currentThread().getName());
+      shuffleStart.set(System.currentTimeMillis());
+      
+      return host;
+  }
+  
+  public synchronized List<TaskAttemptID> getMapsForHost(MapHost host) {
+    List<TaskAttemptID> list = host.getAndClearKnownMaps();
+    Iterator<TaskAttemptID> itr = list.iterator();
+    List<TaskAttemptID> result = new ArrayList<TaskAttemptID>();
+    int includedMaps = 0;
+    int totalSize = list.size();
+    // find the maps that we still need, up to the limit
+    while (itr.hasNext()) {
+      TaskAttemptID id = itr.next();
+      if (!obsoleteMaps.contains(id) && !finishedMaps[id.getTaskID().getId()]) {
+        result.add(id);
+        if (++includedMaps >= MAX_MAPS_AT_ONCE) {
+          break;
+        }
+      }
+    }
+    // put back the maps left after the limit
+    while (itr.hasNext()) {
+      TaskAttemptID id = itr.next();
+      if (!obsoleteMaps.contains(id) && !finishedMaps[id.getTaskID().getId()]) {
+        host.addKnownMap(id);
+      }
+    }
+    LOG.info("assigned " + includedMaps + " of " + totalSize + " to " +
+             host + " to " + Thread.currentThread().getName());
+    return result;
+  }
+
+  public synchronized void freeHost(MapHost host) {
+    if (host.getState() != State.PENALIZED) {
+      if (host.markAvailable() == State.PENDING) {
+        pendingHosts.add(host);
+        notifyAll();
+      }
+    }
+    LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + 
+             (System.currentTimeMillis()-shuffleStart.get()) + "s");
+  }
+    
+  public synchronized void resetKnownMaps() {
+    mapLocations.clear();
+    obsoleteMaps.clear();
+    pendingHosts.clear();
+  }
+  
+  /**
+   * Wait until the shuffle finishes or until the timeout.
+   * @param millis maximum wait time
+   * @return true if the shuffle is done
+   * @throws InterruptedException
+   */
+  public synchronized boolean waitUntilDone(int millis
+                                            ) throws InterruptedException {
+    if (remainingMaps > 0) {
+      wait(millis);
+      return remainingMaps == 0;
+    }
+    return true;
+  }
+  
+  /**
+   * A structure that records the penalty for a host.
+   */
+  private static class Penalty implements Delayed {
+    MapHost host;
+    private long endTime;
+    
+    Penalty(MapHost host, long delay) {
+      this.host = host;
+      this.endTime = System.currentTimeMillis() + delay;
+    }
+
+    public long getDelay(TimeUnit unit) {
+      long remainingTime = endTime - System.currentTimeMillis();
+      return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    public int compareTo(Delayed o) {
+      long other = ((Penalty) o).endTime;
+      return endTime == other ? 0 : (endTime < other ? -1 : 1);
+    }
+    
+  }
+  
+  /**
+   * A thread that takes hosts off of the penalty list when the timer expires.
+   */
+  private class Referee extends Thread {
+    public Referee() {
+      setName("ShufflePenaltyReferee");
+      setDaemon(true);
+    }
+
+    public void run() {
+      try {
+        while (true) {
+          // take the first host that has an expired penalty
+          MapHost host = penalties.take().host;
+          synchronized (ShuffleScheduler.this) {
+            if (host.markAvailable() == MapHost.State.PENDING) {
+              pendingHosts.add(host);
+              ShuffleScheduler.this.notifyAll();
+            }
+          }
+        }
+      } catch (InterruptedException ie) {
+        return;
+      } catch (Throwable t) {
+        reporter.reportException(t);
+      }
+    }
+  }
+  
+  public void close() throws InterruptedException {
+    referee.interrupt();
+    referee.join();
+  }
+
+  public synchronized void informMaxMapRunTime(int duration) {
+    if (duration > maxMapRuntime) {
+      maxMapRuntime = duration;
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml Thu Sep  3 13:56:21 2009
@@ -155,10 +155,33 @@
      </Match>
 
      <Match>
+       <Class name="org.apache.hadoop.mapreduce.task.reduce.MergeThread" />
+       <Field name="inputs" />
+       <Bug pattern="IS2_INCONSISTENT_SYNC" />
+     </Match>
+
+    <!--
+     This class is unlikely to get subclassed, so ignore
+    -->
+     <Match>
+       <Class name="org.apache.hadoop.mapreduce.task.reduce.MergeManager" />
+       <Bug pattern="SC_START_IN_CTOR" />
+     </Match>
+
+    <!--
+      Do not bother if equals is not implemented. We will not need it here
+    -->
+     <Match>
+      <Class name="org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler$Penalty" />
+      <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS" />
+     </Match>
+
+     <Match>
        <Class name="org.apache.hadoop.mapred.Task" />
        <Method name="reportFatalError" />
        <Bug pattern="DM_EXIT" />
      </Match>
+
      <!-- 
         core changes 
      -->

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Thu Sep  3 13:56:21 2009
@@ -237,6 +237,11 @@
         public boolean getIsMap() {
           return t.isMapTask();
         }
+
+        @Override
+        public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+          
+        }
       };
       status.setRunState(TaskStatus.State.RUNNING);
       trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java Thu Sep  3 13:56:21 2009
@@ -37,7 +37,7 @@
     job.setNumMapTasks(MAP_TASKS);
     job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
     job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
-    job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
+    job.set("mapred.job.shuffle.input.buffer.percent", "0.05");
     job.setInt("io.sort.factor", 2);
     job.setInt("mapred.inmem.merge.threshold", 4);
     Counters c = runJob(job);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java Thu Sep  3 13:56:21 2009
@@ -77,7 +77,7 @@
 
   /** Verify that at least one segment does not hit disk */
   public void testReduceFromPartialMem() throws Exception {
-    final int MAP_TASKS = 5;
+    final int MAP_TASKS = 7;
     JobConf job = mrCluster.createJobConf();
     job.setNumMapTasks(MAP_TASKS);
     job.setInt("mapred.inmem.merge.threshold", 0);



Mime
View raw message