tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject svn commit: r1469642 [23/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...
Date Thu, 18 Apr 2013 23:54:28 GMT
Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,793 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.combine.CombineInput;
+import org.apache.tez.engine.common.combine.CombineOutput;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.common.sort.impl.TezMerger;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@SuppressWarnings(value={"rawtypes"})
+public class MergeManager {
+  
+  private static final Log LOG = LogFactory.getLog(MergeManager.class);
+  
+  private final TezTaskAttemptID taskAttemptId;
+  
+  private final Configuration conf;
+  private final FileSystem localFS;
+  private final FileSystem rfs;
+  private final LocalDirAllocator localDirAllocator;
+  
+  private final  TezTaskOutputFiles mapOutputFile;
+  
+  Set<MapOutput> inMemoryMergedMapOutputs = 
+    new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
+  private final IntermediateMemoryToMemoryMerger memToMemMerger;
+
+  Set<MapOutput> inMemoryMapOutputs = 
+    new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
+  private final InMemoryMerger inMemoryMerger;
+  
+  Set<Path> onDiskMapOutputs = new TreeSet<Path>();
+  private final OnDiskMerger onDiskMerger;
+  
+  private final long memoryLimit;
+  private long usedMemory;
+  private long commitMemory;
+  private final long maxSingleShuffleLimit;
+  
+  private final int memToMemMergeOutputsThreshold; 
+  private final long mergeThreshold;
+  
+  private final int ioSortFactor;
+
+  private final TezTaskReporter reporter;
+  private final ExceptionReporter exceptionReporter;
+  
+  /**
+   * Combiner processor to run during in-memory merge, if defined.
+   */
+  private final Processor combineProcessor;
+
+  private final TezCounter spilledRecordsCounter;
+
+  private final TezCounter reduceCombineInputCounter;
+
+  private final TezCounter mergedMapOutputsCounter;
+  
+  private final CompressionCodec codec;
+  
+  private final Progress mergePhase;
+
+  public MergeManager(TezTaskAttemptID taskAttemptId, 
+                      Configuration conf, 
+                      FileSystem localFS,
+                      LocalDirAllocator localDirAllocator,  
+                      TezTaskReporter reporter,
+                      Processor combineProcessor,
+                      TezCounter spilledRecordsCounter,
+                      TezCounter reduceCombineInputCounter,
+                      TezCounter mergedMapOutputsCounter,
+                      ExceptionReporter exceptionReporter,
+                      Progress mergePhase) {
+    this.taskAttemptId = taskAttemptId;
+    this.conf = conf;
+    this.localDirAllocator = localDirAllocator;
+    this.exceptionReporter = exceptionReporter;
+    
+    this.reporter = reporter;
+    this.combineProcessor = combineProcessor;
+    this.reduceCombineInputCounter = reduceCombineInputCounter;
+    this.spilledRecordsCounter = spilledRecordsCounter;
+    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+    this.mapOutputFile = new TezTaskOutputFiles();
+    this.mapOutputFile.setConf(conf);
+    
+    this.localFS = localFS;
+    this.rfs = ((LocalFileSystem)localFS).getRaw();
+
+    if (ConfigUtils.getCompressMapOutput(conf)) {
+      Class<? extends CompressionCodec> codecClass =
+          ConfigUtils.getMapOutputCompressorClass(conf, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, conf);
+    } else {
+      codec = null;
+    }
+
+    final float maxInMemCopyUse =
+      conf.getFloat(
+          TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT, 
+          TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for " +
+          TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
+          maxInMemCopyUse);
+    }
+
+    // Allow unit tests to fix Runtime memory
+    this.memoryLimit = 
+      (long)(conf.getLong(Constants.TEZ_ENGINE_TASK_MEMORY,
+          Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
+        * maxInMemCopyUse);
+ 
+    this.ioSortFactor = 
+        conf.getInt(
+            TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
+
+    final float singleShuffleMemoryLimitPercent =
+        conf.getFloat(
+            TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT,
+            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    if (singleShuffleMemoryLimitPercent <= 0.0f
+        || singleShuffleMemoryLimitPercent > 1.0f) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+          + singleShuffleMemoryLimitPercent);
+    }
+
+    this.maxSingleShuffleLimit = 
+      (long)(memoryLimit * singleShuffleMemoryLimitPercent);
+    this.memToMemMergeOutputsThreshold = 
+            conf.getInt(
+                TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS, 
+                ioSortFactor);
+    this.mergeThreshold = 
+        (long)(this.memoryLimit * 
+               conf.getFloat(
+                   TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT, 
+                   TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_MERGE_PERCENT));
+    LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
+             "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
+             "mergeThreshold=" + mergeThreshold + ", " + 
+             "ioSortFactor=" + ioSortFactor + ", " +
+             "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
+
+    if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
+      throw new RuntimeException("Invlaid configuration: "
+          + "maxSingleShuffleLimit should be less than mergeThreshold"
+          + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
+          + "mergeThreshold: " + this.mergeThreshold);
+    }
+
+    boolean allowMemToMemMerge = 
+      conf.getBoolean(
+          TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM, 
+          TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
+    if (allowMemToMemMerge) {
+      this.memToMemMerger = 
+        new IntermediateMemoryToMemoryMerger(this,
+                                             memToMemMergeOutputsThreshold);
+      this.memToMemMerger.start();
+    } else {
+      this.memToMemMerger = null;
+    }
+    
+    this.inMemoryMerger = new InMemoryMerger(this);
+    this.inMemoryMerger.start();
+    
+    this.onDiskMerger = new OnDiskMerger(this);
+    this.onDiskMerger.start();
+    
+    this.mergePhase = mergePhase;
+  }
+  
+
+  TezTaskAttemptID getReduceId() {
+    return taskAttemptId;
+  }
+
+  public void waitForInMemoryMerge() throws InterruptedException {
+    inMemoryMerger.waitForMerge();
+  }
+  
+  private boolean canShuffleToMemory(long requestedSize) {
+    return (requestedSize < maxSingleShuffleLimit); 
+  }
+  
+  final private MapOutput stallShuffle = new MapOutput(null);
+
+  public synchronized MapOutput reserve(TezTaskAttemptID mapId, 
+                                             long requestedSize,
+                                             int fetcher
+                                             ) throws IOException {
+    if (!canShuffleToMemory(requestedSize)) {
+      LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
+               " is greater than maxSingleShuffleLimit (" + 
+               maxSingleShuffleLimit + ")");
+      return new MapOutput(mapId, this, requestedSize, conf, 
+                                localDirAllocator, fetcher, true,
+                                mapOutputFile);
+    }
+    
+    // Stall shuffle if we are above the memory limit
+
+    // It is possible that all threads could just be stalling and not make
+    // progress at all. This could happen when:
+    //
+    // requested size is causing the used memory to go above limit &&
+    // requested size < singleShuffleLimit &&
+    // current used size < mergeThreshold (merge will not get triggered)
+    //
+    // To avoid this from happening, we allow exactly one thread to go past
+    // the memory limit. We check (usedMemory > memoryLimit) and not
+    // (usedMemory + requestedSize > memoryLimit). When this thread is done
+    // fetching, this will automatically trigger a merge thereby unlocking
+    // all the stalled threads
+    
+    if (usedMemory > memoryLimit) {
+      LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+          + ") is greater than memoryLimit (" + memoryLimit + ")." + 
+          " CommitMemory is (" + commitMemory + ")"); 
+      return stallShuffle;
+    }
+    
+    // Allow the in-memory shuffle to progress
+    LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
+        + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
+        + "CommitMemory is (" + commitMemory + ")"); 
+    return unconditionalReserve(mapId, requestedSize, true);
+  }
+  
+  /**
+   * Unconditional Reserve is used by the Memory-to-Memory thread
+   * @return
+   */
+  private synchronized MapOutput unconditionalReserve(
+      TezTaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
+    usedMemory += requestedSize;
+    return new MapOutput(mapId, this, (int)requestedSize, 
+        primaryMapOutput);
+  }
+  
+  synchronized void unreserve(long size) {
+    commitMemory -= size;
+    usedMemory -= size;
+  }
+
+  public synchronized void closeInMemoryFile(MapOutput mapOutput) { 
+    inMemoryMapOutputs.add(mapOutput);
+    LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
+        + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
+        + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
+
+    commitMemory+= mapOutput.getSize();
+
+    synchronized (inMemoryMerger) {
+      // Can hang if mergeThreshold is really low.
+      if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
+        LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
+            commitMemory + " > mergeThreshold=" + mergeThreshold + 
+            ". Current usedMemory=" + usedMemory);
+        inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
+        inMemoryMergedMapOutputs.clear();
+        inMemoryMerger.startMerge(inMemoryMapOutputs);
+      } 
+    }
+    
+    if (memToMemMerger != null) {
+      synchronized (memToMemMerger) {
+        if (!memToMemMerger.isInProgress() && 
+            inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
+          memToMemMerger.startMerge(inMemoryMapOutputs);
+        }
+      }
+    }
+  }
+  
+  
+  public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
+    inMemoryMergedMapOutputs.add(mapOutput);
+    LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + 
+             ", inMemoryMergedMapOutputs.size() -> " + 
+             inMemoryMergedMapOutputs.size());
+  }
+  
+  public synchronized void closeOnDiskFile(Path file) {
+    onDiskMapOutputs.add(file);
+    
+    synchronized (onDiskMerger) {
+      if (!onDiskMerger.isInProgress() && 
+          onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
+        onDiskMerger.startMerge(onDiskMapOutputs);
+      }
+    }
+  }
+  
+  public TezRawKeyValueIterator close() throws Throwable {
+    // Wait for on-going merges to complete
+    if (memToMemMerger != null) { 
+      memToMemMerger.close();
+    }
+    inMemoryMerger.close();
+    onDiskMerger.close();
+    
+    List<MapOutput> memory = 
+      new ArrayList<MapOutput>(inMemoryMergedMapOutputs);
+    memory.addAll(inMemoryMapOutputs);
+    List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
+    return finalMerge(conf, rfs, memory, disk);
+  }
+   
+  void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
+  throws IOException, InterruptedException {
+
+    CombineInput combineIn = new CombineInput(kvIter);
+    combineIn.initialize(conf, reporter);
+    
+    CombineOutput combineOut = new CombineOutput(writer);
+    combineOut.initialize(conf, reporter);
+    
+    combineProcessor.process(combineIn, combineOut);
+    
+    combineIn.close();
+    combineOut.close();
+  
+  }
+
+  private class IntermediateMemoryToMemoryMerger 
+  extends MergeThread<MapOutput> {
+    
+    public IntermediateMemoryToMemoryMerger(MergeManager manager, 
+                                            int mergeFactor) {
+      super(manager, mergeFactor, exceptionReporter);
+      setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
+      		    "shuffled map-outputs");
+      setDaemon(true);
+    }
+
+    @Override
+    public void merge(List<MapOutput> inputs) throws IOException {
+      if (inputs == null || inputs.size() == 0) {
+        return;
+      }
+
+      TezTaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
+      List<Segment> inMemorySegments = new ArrayList<Segment>();
+      long mergeOutputSize = 
+        createInMemorySegments(inputs, inMemorySegments, 0);
+      int noInMemorySegments = inMemorySegments.size();
+      
+      MapOutput mergedMapOutputs = 
+        unconditionalReserve(dummyMapId, mergeOutputSize, false);
+      
+      Writer writer = 
+        new InMemoryWriter(mergedMapOutputs.getArrayStream());
+      
+      LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
+               " segments of total-size: " + mergeOutputSize);
+
+      TezRawKeyValueIterator rIter = 
+        TezMerger.merge(conf, rfs,
+                       ConfigUtils.getMapOutputKeyClass(conf),
+                       ConfigUtils.getMapOutputValueClass(conf),
+                       inMemorySegments, inMemorySegments.size(),
+                       new Path(taskAttemptId.toString()),
+                       (RawComparator)ConfigUtils.getOutputKeyComparator(conf),
+                       reporter, null, null, null);
+      TezMerger.writeFile(rIter, writer, reporter, conf);
+      writer.close();
+
+      LOG.info(taskAttemptId +  
+               " Memory-to-Memory merge of the " + noInMemorySegments +
+               " files in-memory complete.");
+
+      // Note the output of the merge
+      closeInMemoryMergedFile(mergedMapOutputs);
+    }
+  }
+  
+  private class InMemoryMerger extends MergeThread<MapOutput> {
+    
+    public InMemoryMerger(MergeManager manager) {
+      super(manager, Integer.MAX_VALUE, exceptionReporter);
+      setName
+      ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
+      setDaemon(true);
+    }
+    
+    @Override
+    public void merge(List<MapOutput> inputs) throws IOException, InterruptedException {
+      if (inputs == null || inputs.size() == 0) {
+        return;
+      }
+      
+      //name this output file same as the name of the first file that is 
+      //there in the current list of inmem files (this is guaranteed to
+      //be absent on the disk currently. So we don't overwrite a prev. 
+      //created spill). Also we need to create the output file now since
+      //it is not guaranteed that this file will be present after merge
+      //is called (we delete empty files as soon as we see them
+      //in the merge method)
+
+      //figure out the mapId 
+      TezTaskAttemptID mapId = inputs.get(0).getMapId();
+      TezTaskID mapTaskId = mapId.getTaskID();
+
+      List<Segment> inMemorySegments = new ArrayList<Segment>();
+      long mergeOutputSize = 
+        createInMemorySegments(inputs, inMemorySegments,0);
+      int noInMemorySegments = inMemorySegments.size();
+
+      Path outputPath = 
+        mapOutputFile.getInputFileForWrite(mapTaskId,
+                                           mergeOutputSize).suffix(
+                                               Constants.MERGED_OUTPUT_PREFIX);
+
+      Writer writer = 
+        new Writer(conf, rfs, outputPath,
+                        (Class)ConfigUtils.getMapOutputKeyClass(conf),
+                        (Class)ConfigUtils.getMapOutputValueClass(conf),
+                        codec, null);
+
+      TezRawKeyValueIterator rIter = null;
+      try {
+        LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
+                 " segments...");
+        
+        rIter = TezMerger.merge(conf, rfs,
+                             (Class)ConfigUtils.getMapOutputKeyClass(conf),
+                             (Class)ConfigUtils.getMapOutputValueClass(conf),
+                             inMemorySegments, inMemorySegments.size(),
+                             new Path(taskAttemptId.toString()),
+                             (RawComparator)ConfigUtils.getOutputKeyComparator(conf),
+                             reporter, spilledRecordsCounter, null, null);
+        
+        if (null == combineProcessor) {
+          TezMerger.writeFile(rIter, writer, reporter, conf);
+        } else {
+          runCombineProcessor(rIter, writer);
+        }
+        writer.close();
+
+        LOG.info(taskAttemptId +  
+            " Merge of the " + noInMemorySegments +
+            " files in-memory complete." +
+            " Local file is " + outputPath + " of size " + 
+            localFS.getFileStatus(outputPath).getLen());
+      } catch (IOException e) { 
+        //make sure that we delete the ondisk file that we created 
+        //earlier when we invoked cloneFileAttributes
+        localFS.delete(outputPath, true);
+        throw e;
+      }
+
+      // Note the output of the merge
+      closeOnDiskFile(outputPath);
+    }
+
+  }
+  
+  private class OnDiskMerger extends MergeThread<Path> {
+    
+    public OnDiskMerger(MergeManager manager) {
+      super(manager, Integer.MAX_VALUE, exceptionReporter);
+      setName("OnDiskMerger - Thread to merge on-disk map-outputs");
+      setDaemon(true);
+    }
+    
+    @Override
+    public void merge(List<Path> inputs) throws IOException {
+      // sanity check
+      if (inputs == null || inputs.isEmpty()) {
+        LOG.info("No ondisk files to merge...");
+        return;
+      }
+      
+      long approxOutputSize = 0;
+      int bytesPerSum = 
+        conf.getInt("io.bytes.per.checksum", 512);
+      
+      LOG.info("OnDiskMerger: We have  " + inputs.size() + 
+               " map outputs on disk. Triggering merge...");
+      
+      // 1. Prepare the list of files to be merged. 
+      for (Path file : inputs) {
+        approxOutputSize += localFS.getFileStatus(file).getLen();
+      }
+
+      // add the checksum length
+      approxOutputSize += 
+        ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
+
+      // 2. Start the on-disk merge process
+      Path outputPath = 
+        localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
+            approxOutputSize, conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
+      Writer writer = 
+        new Writer(conf, rfs, outputPath, 
+                        (Class)ConfigUtils.getMapOutputKeyClass(conf), 
+                        (Class)ConfigUtils.getMapOutputValueClass(conf),
+                        codec, null);
+      TezRawKeyValueIterator iter  = null;
+      Path tmpDir = new Path(taskAttemptId.toString());
+      try {
+        iter = TezMerger.merge(conf, rfs,
+                            (Class)ConfigUtils.getMapOutputKeyClass(conf), 
+                            (Class)ConfigUtils.getMapOutputValueClass(conf),
+                            codec, inputs.toArray(new Path[inputs.size()]), 
+                            true, ioSortFactor, tmpDir, 
+                            (RawComparator)ConfigUtils.getOutputKeyComparator(conf), 
+                            reporter, spilledRecordsCounter, null, 
+                            mergedMapOutputsCounter, null);
+
+        TezMerger.writeFile(iter, writer, reporter, conf);
+        writer.close();
+      } catch (IOException e) {
+        localFS.delete(outputPath, true);
+        throw e;
+      }
+
+      closeOnDiskFile(outputPath);
+
+      LOG.info(taskAttemptId +
+          " Finished merging " + inputs.size() + 
+          " map output files on disk of total-size " + 
+          approxOutputSize + "." + 
+          " Local output file is " + outputPath + " of size " +
+          localFS.getFileStatus(outputPath).getLen());
+    }
+  }
+  
+  private long createInMemorySegments(List<MapOutput> inMemoryMapOutputs,
+                                      List<Segment> inMemorySegments, 
+                                      long leaveBytes
+                                      ) throws IOException {
+    long totalSize = 0L;
+    // We could use fullSize could come from the RamManager, but files can be
+    // closed but not yet present in inMemoryMapOutputs
+    long fullSize = 0L;
+    for (MapOutput mo : inMemoryMapOutputs) {
+      fullSize += mo.getMemory().length;
+    }
+    while(fullSize > leaveBytes) {
+      MapOutput mo = inMemoryMapOutputs.remove(0);
+      byte[] data = mo.getMemory();
+      long size = data.length;
+      totalSize += size;
+      fullSize -= size;
+      IFile.Reader reader = new InMemoryReader(MergeManager.this, 
+                                                   mo.getMapId(),
+                                                   data, 0, (int)size);
+      inMemorySegments.add(new Segment(reader, true, 
+                                            (mo.isPrimaryMapOutput() ? 
+                                            mergedMapOutputsCounter : null)));
+    }
+    return totalSize;
+  }
+
+  class RawKVIteratorReader extends IFile.Reader {
+
+    private final TezRawKeyValueIterator kvIter;
+
+    public RawKVIteratorReader(TezRawKeyValueIterator kvIter, long size)
+        throws IOException {
+      super(null, null, size, null, spilledRecordsCounter);
+      this.kvIter = kvIter;
+    }
+    public boolean nextRawKey(DataInputBuffer key) throws IOException {
+      if (kvIter.next()) {
+        final DataInputBuffer kb = kvIter.getKey();
+        final int kp = kb.getPosition();
+        final int klen = kb.getLength() - kp;
+        key.reset(kb.getData(), kp, klen);
+        bytesRead += klen;
+        return true;
+      }
+      return false;
+    }
+    public void nextRawValue(DataInputBuffer value) throws IOException {
+      final DataInputBuffer vb = kvIter.getValue();
+      final int vp = vb.getPosition();
+      final int vlen = vb.getLength() - vp;
+      value.reset(vb.getData(), vp, vlen);
+      bytesRead += vlen;
+    }
+    public long getPosition() throws IOException {
+      return bytesRead;
+    }
+
+    public void close() throws IOException {
+      kvIter.close();
+    }
+  }
+
+  private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
+                                       List<MapOutput> inMemoryMapOutputs,
+                                       List<Path> onDiskMapOutputs
+                                       ) throws IOException {
+    LOG.info("finalMerge called with " + 
+             inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
+             onDiskMapOutputs.size() + " on-disk map-outputs");
+    
+    final float maxRedPer =
+      job.getFloat(
+          TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT,
+          TezJobConfig.DEFAULT_TEZ_ENGINE_INPUT_BUFFER_PERCENT);
+    if (maxRedPer > 1.0 || maxRedPer < 0.0) {
+      throw new IOException(TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT +
+                            maxRedPer);
+    }
+    int maxInMemReduce = (int)Math.min(
+        Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
+    
+
+    // merge config params
+    Class keyClass = (Class)ConfigUtils.getMapOutputKeyClass(job);
+    Class valueClass = (Class)ConfigUtils.getMapOutputValueClass(job);
+    final Path tmpDir = new Path(taskAttemptId.toString());
+    final RawComparator comparator =
+      (RawComparator)ConfigUtils.getOutputKeyComparator(job);
+
+    // segments required to vacate memory
+    List<Segment> memDiskSegments = new ArrayList<Segment>();
+    long inMemToDiskBytes = 0;
+    boolean mergePhaseFinished = false;
+    if (inMemoryMapOutputs.size() > 0) {
+      TezTaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
+      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
+                                                memDiskSegments,
+                                                maxInMemReduce);
+      final int numMemDiskSegments = memDiskSegments.size();
+      if (numMemDiskSegments > 0 &&
+            ioSortFactor > onDiskMapOutputs.size()) {
+        
+        // If we reach here, it implies that we have less than io.sort.factor
+        // disk segments and this will be incremented by 1 (result of the 
+        // memory segments merge). Since this total would still be 
+        // <= io.sort.factor, we will not do any more intermediate merges,
+        // the merge of all these disk segments would be directly fed to the
+        // reduce method
+        
+        mergePhaseFinished = true;
+        // must spill to disk, but can't retain in-mem for intermediate merge
+        final Path outputPath = 
+          mapOutputFile.getInputFileForWrite(mapId,
+                                             inMemToDiskBytes).suffix(
+                                                 Constants.MERGED_OUTPUT_PREFIX);
+        final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs,
+            keyClass, valueClass, memDiskSegments, numMemDiskSegments,
+            tmpDir, comparator, reporter, spilledRecordsCounter, null, 
+            mergePhase);
+        final Writer writer = new Writer(job, fs, outputPath,
+            keyClass, valueClass, codec, null);
+        try {
+          TezMerger.writeFile(rIter, writer, reporter, job);
+          // add to list of final disk outputs.
+          onDiskMapOutputs.add(outputPath);
+        } catch (IOException e) {
+          if (null != outputPath) {
+            try {
+              fs.delete(outputPath, true);
+            } catch (IOException ie) {
+              // NOTHING
+            }
+          }
+          throw e;
+        } finally {
+          if (null != writer) {
+            writer.close();
+          }
+        }
+        LOG.info("Merged " + numMemDiskSegments + " segments, " +
+                 inMemToDiskBytes + " bytes to disk to satisfy " +
+                 "reduce memory limit");
+        inMemToDiskBytes = 0;
+        memDiskSegments.clear();
+      } else if (inMemToDiskBytes != 0) {
+        LOG.info("Keeping " + numMemDiskSegments + " segments, " +
+                 inMemToDiskBytes + " bytes in memory for " +
+                 "intermediate, on-disk merge");
+      }
+    }
+
+    // segments on disk
+    List<Segment> diskSegments = new ArrayList<Segment>();
+    long onDiskBytes = inMemToDiskBytes;
+    Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
+    for (Path file : onDisk) {
+      onDiskBytes += fs.getFileStatus(file).getLen();
+      LOG.debug("Disk file: " + file + " Length is " + 
+          fs.getFileStatus(file).getLen());
+      diskSegments.add(new Segment(job, fs, file, codec, false,
+                                         (file.toString().endsWith(
+                                             Constants.MERGED_OUTPUT_PREFIX) ?
+                                          null : mergedMapOutputsCounter)
+                                        ));
+    }
+    LOG.info("Merging " + onDisk.length + " files, " +
+             onDiskBytes + " bytes from disk");
+    Collections.sort(diskSegments, new Comparator<Segment>() {
+      public int compare(Segment o1, Segment o2) {
+        if (o1.getLength() == o2.getLength()) {
+          return 0;
+        }
+        return o1.getLength() < o2.getLength() ? -1 : 1;
+      }
+    });
+
+    // build final list of segments from merged backed by disk + in-mem
+    List<Segment> finalSegments = new ArrayList<Segment>();
+    long inMemBytes = createInMemorySegments(inMemoryMapOutputs, 
+                                             finalSegments, 0);
+    LOG.info("Merging " + finalSegments.size() + " segments, " +
+             inMemBytes + " bytes from memory into reduce");
+    if (0 != onDiskBytes) {
+      final int numInMemSegments = memDiskSegments.size();
+      diskSegments.addAll(0, memDiskSegments);
+      memDiskSegments.clear();
+      // Pass mergePhase only if there is a going to be intermediate
+      // merges. See comment where mergePhaseFinished is being set
+      Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
+      TezRawKeyValueIterator diskMerge = TezMerger.merge(
+          job, fs, keyClass, valueClass, diskSegments,
+          ioSortFactor, numInMemSegments, tmpDir, comparator,
+          reporter, false, spilledRecordsCounter, null, thisPhase);
+      diskSegments.clear();
+      if (0 == finalSegments.size()) {
+        return diskMerge;
+      }
+      finalSegments.add(new Segment(
+            new RawKVIteratorReader(diskMerge, onDiskBytes), true));
+    }
+    return TezMerger.merge(job, fs, keyClass, valueClass,
+                 finalSegments, finalSegments.size(), tmpDir,
+                 comparator, reporter, spilledRecordsCounter, null,
+                 null);
+  
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+abstract class MergeThread<T> extends Thread {
+  
+  private static final Log LOG = LogFactory.getLog(MergeThread.class);
+
+  private volatile boolean inProgress = false;
+  private List<T> inputs = new ArrayList<T>();
+  protected final MergeManager manager;
+  private final ExceptionReporter reporter;
+  private boolean closed = false;
+  private final int mergeFactor;
+  
+  public MergeThread(MergeManager manager, int mergeFactor,
+                     ExceptionReporter reporter) {
+    this.manager = manager;
+    this.mergeFactor = mergeFactor;
+    this.reporter = reporter;
+  }
+  
+  public synchronized void close() throws InterruptedException {
+    closed = true;
+    waitForMerge();
+    interrupt();
+  }
+
+  public synchronized boolean isInProgress() {
+    return inProgress;
+  }
+  
+  public synchronized void startMerge(Set<T> inputs) {
+    if (!closed) {
+      inProgress = true;
+      this.inputs = new ArrayList<T>();
+      Iterator<T> iter=inputs.iterator();
+      for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
+        this.inputs.add(iter.next());
+        iter.remove();
+      }
+      LOG.info(getName() + ": Starting merge with " + this.inputs.size() + 
+               " segments, while ignoring " + inputs.size() + " segments");
+      notifyAll();
+    }
+  }
+
+  public synchronized void waitForMerge() throws InterruptedException {
+    while (inProgress) {
+      wait();
+    }
+  }
+
+  public void run() {
+    while (true) {
+      try {
+        // Wait for notification to start the merge...
+        synchronized (this) {
+          while (!inProgress) {
+            wait();
+          }
+        }
+
+        // Merge
+        merge(inputs);
+      } catch (InterruptedException ie) {
+        return;
+      } catch(Throwable t) {
+        reporter.reportException(t);
+        return;
+      } finally {
+        synchronized (this) {
+          // Clear inputs
+          inputs = null;
+          inProgress = false;        
+          notifyAll();
+        }
+      }
+    }
+  }
+
+  public abstract void merge(List<T> inputs) 
+      throws IOException, InterruptedException;
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Shuffle implements ExceptionReporter {
+  
+  private static final Log LOG = LogFactory.getLog(Shuffle.class);
+  private static final int PROGRESS_FREQUENCY = 2000;
+  private static final int MAX_EVENTS_TO_FETCH = 10000;
+  private static final int MIN_EVENTS_TO_FETCH = 100;
+  private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
+
+  private final TezTask task;
+  private final Configuration conf;
+  private final TezTaskReporter reporter;
+  private final ShuffleClientMetrics metrics;
+  
+  private final ShuffleScheduler scheduler;
+  private final MergeManager merger;
+  private Throwable throwable = null;
+  private String throwingThreadName = null;
+  private final Progress copyPhase;
+  private final Progress mergePhase;
+  private final int tasksInDegree;
+  
+  public Shuffle(TezTask task, 
+                 Configuration conf,
+                 int tasksInDegree,
+                 TezTaskReporter reporter,
+                 Processor combineProcessor
+                 ) throws IOException {
+    this.task = task;
+    this.conf = conf;
+    this.reporter = reporter;
+    this.metrics = 
+        new ShuffleClientMetrics(
+            task.getTaskAttemptId(), this.conf, 
+            this.task.getUser(), this.task.getJobName());
+    this.tasksInDegree = tasksInDegree;
+    
+    FileSystem localFS = FileSystem.getLocal(this.conf);
+    LocalDirAllocator localDirAllocator = 
+        new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+    
+    copyPhase = this.task.getProgress().addPhase("copy", 0.33f);
+    mergePhase = this.task.getProgress().addPhase("merge", 0.66f);
+
+    TezCounter shuffledMapsCounter = 
+        reporter.getCounter(TaskCounter.SHUFFLED_MAPS);
+    TezCounter reduceShuffleBytes =
+        reporter.getCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
+    TezCounter failedShuffleCounter =
+        reporter.getCounter(TaskCounter.FAILED_SHUFFLE);
+    TezCounter spilledRecordsCounter = 
+        reporter.getCounter(TaskCounter.SPILLED_RECORDS);
+    TezCounter reduceCombineInputCounter =
+        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    TezCounter mergedMapOutputsCounter =
+        reporter.getCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+    
+    scheduler = 
+      new ShuffleScheduler(this.conf, tasksInDegree, task.getStatus(), 
+                                this, copyPhase, 
+                                shuffledMapsCounter, 
+                                reduceShuffleBytes, 
+                                failedShuffleCounter);
+    merger = new MergeManager(this.task.getTaskAttemptId(), 
+                                    this.conf, localFS, 
+                                    localDirAllocator, reporter, 
+                                    combineProcessor, 
+                                    spilledRecordsCounter, 
+                                    reduceCombineInputCounter, 
+                                    mergedMapOutputsCounter, 
+                                    this, mergePhase);
+  }
+
+  public TezRawKeyValueIterator run() throws IOException, InterruptedException {
+    // Scale the maximum events we fetch per RPC call to mitigate OOM issues
+    // on the ApplicationMaster when a thundering herd of reducers fetch events
+    // TODO: This should not be necessary after HADOOP-8942
+    int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
+        MAX_RPC_OUTSTANDING_EVENTS / tasksInDegree);
+    int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
+
+    // Start the map-completion events fetcher thread
+    final EventFetcher eventFetcher = 
+      new EventFetcher(task.getTaskAttemptId(), reporter, scheduler, this,
+          maxEventsToFetch);
+    eventFetcher.start();
+    
+    // Start the map-output fetcher threads
+    final int numFetchers = 
+        conf.getInt(
+            TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+    Fetcher[] fetchers = new Fetcher[numFetchers];
+    for (int i=0; i < numFetchers; ++i) {
+      fetchers[i] = new Fetcher(conf, task.getTaskAttemptId(), 
+                                     scheduler, merger, 
+                                     reporter, metrics, this, 
+                                     task.getJobTokenSecret());
+      fetchers[i].start();
+    }
+    
+    // Wait for shuffle to complete successfully
+    while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
+      reporter.progress();
+      
+      synchronized (this) {
+        if (throwable != null) {
+          throw new ShuffleError("error in shuffle in " + throwingThreadName,
+                                 throwable);
+        }
+      }
+    }
+
+    // Stop the event-fetcher thread
+    eventFetcher.shutDown();
+    
+    // Stop the map-output fetcher threads
+    for (Fetcher fetcher : fetchers) {
+      fetcher.shutDown();
+    }
+    fetchers = null;
+    
+    // stop the scheduler
+    scheduler.close();
+
+    copyPhase.complete(); // copy is already complete
+    task.getStatus().setPhase(TezTaskStatus.Phase.SORT);
+    
+    task.statusUpdate();
+    
+    // Finish the on-going merges...
+    TezRawKeyValueIterator kvIter = null;
+    try {
+      kvIter = merger.close();
+    } catch (Throwable e) {
+      throw new ShuffleError("Error while doing final merge " , e);
+    }
+
+    // Sanity check
+    synchronized (this) {
+      if (throwable != null) {
+        throw new ShuffleError("error in shuffle in " + throwingThreadName,
+                               throwable);
+      }
+    }
+    
+    return kvIter;
+  }
+
+  public synchronized void reportException(Throwable t) {
+    if (throwable == null) {
+      throwable = t;
+      throwingThreadName = Thread.currentThread().getName();
+      // Notify the scheduler so that the reporting thread finds the 
+      // exception immediately.
+      synchronized (scheduler) {
+        scheduler.notifyAll();
+      }
+    }
+  }
+  
+  public static class ShuffleError extends IOException {
+    private static final long serialVersionUID = 5753909320586607881L;
+
+    ShuffleError(String msg, Throwable t) {
+      super(msg, t);
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+class ShuffleClientMetrics implements Updater {
+
+  private MetricsRecord shuffleMetrics = null;
+  private int numFailedFetches = 0;
+  private int numSuccessFetches = 0;
+  private long numBytes = 0;
+  private int numThreadsBusy = 0;
+  private final int numCopiers;
+  
+  ShuffleClientMetrics(TezTaskAttemptID reduceId, Configuration jobConf, 
+      String user, String jobName) {
+    this.numCopiers = 
+        jobConf.getInt(
+            TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+
+    MetricsContext metricsContext = MetricsUtil.getContext(Constants.TEZ);
+    this.shuffleMetrics = 
+      MetricsUtil.createRecord(metricsContext, "shuffleInput");
+    this.shuffleMetrics.setTag("user", user);
+    this.shuffleMetrics.setTag("jobName", jobName);
+    this.shuffleMetrics.setTag("jobId", 
+        reduceId.getTaskID().getVertexID().getDAGId().toString());
+    this.shuffleMetrics.setTag("taskId", reduceId.toString());
+    this.shuffleMetrics.setTag("sessionId", 
+        jobConf.get(
+            TezJobConfig.TEZ_ENGINE_METRICS_SESSION_ID, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_METRICS_SESSION_ID));
+    metricsContext.registerUpdater(this);
+  }
+  public synchronized void inputBytes(long numBytes) {
+    this.numBytes += numBytes;
+  }
+  public synchronized void failedFetch() {
+    ++numFailedFetches;
+  }
+  public synchronized void successFetch() {
+    ++numSuccessFetches;
+  }
+  public synchronized void threadBusy() {
+    ++numThreadsBusy;
+  }
+  public synchronized void threadFree() {
+    --numThreadsBusy;
+  }
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
+      shuffleMetrics.incrMetric("shuffle_failed_fetches", 
+                                numFailedFetches);
+      shuffleMetrics.incrMetric("shuffle_success_fetches", 
+                                numSuccessFetches);
+      if (numCopiers != 0) {
+        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
+            100*((float)numThreadsBusy/numCopiers));
+      } else {
+        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
+      }
+      numBytes = 0;
+      numSuccessFetches = 0;
+      numFailedFetches = 0;
+    }
+    shuffleMetrics.update();
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Shuffle Header information that is sent by the TaskTracker and 
+ * deciphered by the Fetcher thread of Reduce task
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class ShuffleHeader implements Writable {
+
+  /**
+   * The longest possible length of task attempt id that we will accept.
+   */
+  private static final int MAX_ID_LENGTH = 1000;
+
+  String mapId;
+  long uncompressedLength;
+  long compressedLength;
+  int forReduce;
+  
+  public ShuffleHeader() { }
+  
+  public ShuffleHeader(String mapId, long compressedLength,
+      long uncompressedLength, int forReduce) {
+    this.mapId = mapId;
+    this.compressedLength = compressedLength;
+    this.uncompressedLength = uncompressedLength;
+    this.forReduce = forReduce;
+  }
+  
+  public long getUncompressedLength() {
+    return uncompressedLength;
+  }
+
+  public long getCompressedLength() {
+    return compressedLength;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    mapId = WritableUtils.readStringSafely(in, MAX_ID_LENGTH);
+    compressedLength = WritableUtils.readVLong(in);
+    uncompressedLength = WritableUtils.readVLong(in);
+    forReduce = WritableUtils.readVInt(in);
+  }
+
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, mapId);
+    WritableUtils.writeVLong(out, compressedLength);
+    WritableUtils.writeVLong(out, uncompressedLength);
+    WritableUtils.writeVInt(out, forReduce);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+class ShuffleScheduler {
+  static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
+    protected Long initialValue() {
+      return 0L;
+    }
+  };
+
+  private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
+  private static final int MAX_MAPS_AT_ONCE = 20;
+  private static final long INITIAL_PENALTY = 10000;
+  private static final float PENALTY_GROWTH_RATE = 1.3f;
+  
+  private final boolean[] finishedMaps;
+  private final int tasksInDegree;
+  private int remainingMaps;
+  private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
+  private Set<MapHost> pendingHosts = new HashSet<MapHost>();
+  private Set<TezTaskAttemptID> obsoleteMaps = new HashSet<TezTaskAttemptID>();
+  
+  private final Random random = new Random(System.currentTimeMillis());
+  private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
+  private final Referee referee = new Referee();
+  private final Map<TezTaskAttemptID,IntWritable> failureCounts =
+    new HashMap<TezTaskAttemptID,IntWritable>();
+  private final Map<String,IntWritable> hostFailures = 
+    new HashMap<String,IntWritable>();
+  private final TezTaskStatus status;
+  private final ExceptionReporter reporter;
+  private final int abortFailureLimit;
+  private final Progress progress;
+  private final TezCounter shuffledMapsCounter;
+  private final TezCounter reduceShuffleBytes;
+  private final TezCounter failedShuffleCounter;
+  
+  private final long startTime;
+  private long lastProgressTime;
+  
+  private int maxMapRuntime = 0;
+  private int maxFailedUniqueFetches = 5;
+  private int maxFetchFailuresBeforeReporting;
+  
+  private long totalBytesShuffledTillNow = 0;
+  private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
+
+  private boolean reportReadErrorImmediately = true;
+  
+  public ShuffleScheduler(Configuration conf,
+                          int tasksInDegree,
+                          TezTaskStatus status,
+                          ExceptionReporter reporter,
+                          Progress progress,
+                          TezCounter shuffledMapsCounter,
+                          TezCounter reduceShuffleBytes,
+                          TezCounter failedShuffleCounter) {
+    this.tasksInDegree = tasksInDegree;
+    abortFailureLimit = Math.max(30, tasksInDegree / 10);
+    remainingMaps = tasksInDegree;
+    finishedMaps = new boolean[remainingMaps];
+    this.reporter = reporter;
+    this.status = status;
+    this.progress = progress;
+    this.shuffledMapsCounter = shuffledMapsCounter;
+    this.reduceShuffleBytes = reduceShuffleBytes;
+    this.failedShuffleCounter = failedShuffleCounter;
+    this.startTime = System.currentTimeMillis();
+    lastProgressTime = startTime;
+    referee.start();
+    this.maxFailedUniqueFetches = Math.min(tasksInDegree,
+        this.maxFailedUniqueFetches);
+    this.maxFetchFailuresBeforeReporting = 
+        conf.getInt(
+            TezJobConfig.TEZ_ENGINE_SHUFFLE_FETCH_FAILURES, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_FETCH_FAILURES_LIMIT);
+    this.reportReadErrorImmediately = 
+        conf.getBoolean(
+            TezJobConfig.TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
+  }
+
+  public synchronized void copySucceeded(TezTaskAttemptID mapId, 
+                                         MapHost host,
+                                         long bytes,
+                                         long millis,
+                                         MapOutput output
+                                         ) throws IOException {
+    failureCounts.remove(mapId);
+    hostFailures.remove(host.getHostName());
+    int mapIndex = mapId.getTaskID().getId();
+    
+    if (!finishedMaps[mapIndex]) {
+      output.commit();
+      finishedMaps[mapIndex] = true;
+      shuffledMapsCounter.increment(1);
+      if (--remainingMaps == 0) {
+        notifyAll();
+      }
+
+      // update the status
+      totalBytesShuffledTillNow += bytes;
+      updateStatus();
+      reduceShuffleBytes.increment(bytes);
+      lastProgressTime = System.currentTimeMillis();
+      LOG.debug("map " + mapId + " done " + status.getStateString());
+    }
+  }
+  
+  private void updateStatus() {
+    float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
+    int mapsDone = tasksInDegree - remainingMaps;
+    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+
+    float transferRate = mbs / secsSinceStart;
+    progress.set((float) mapsDone / tasksInDegree);
+    String statusString = mapsDone + " / " + tasksInDegree + " copied.";
+    status.setStateString(statusString);
+
+    progress.setStatus("copy(" + mapsDone + " of " + tasksInDegree + " at "
+        + mbpsFormat.format(transferRate) + " MB/s)");
+  }
+
+  public synchronized void copyFailed(TezTaskAttemptID mapId, MapHost host,
+                                      boolean readError) {
+    host.penalize();
+    int failures = 1;
+    if (failureCounts.containsKey(mapId)) {
+      IntWritable x = failureCounts.get(mapId);
+      x.set(x.get() + 1);
+      failures = x.get();
+    } else {
+      failureCounts.put(mapId, new IntWritable(1));      
+    }
+    String hostname = host.getHostName();
+    if (hostFailures.containsKey(hostname)) {
+      IntWritable x = hostFailures.get(hostname);
+      x.set(x.get() + 1);
+    } else {
+      hostFailures.put(hostname, new IntWritable(1));
+    }
+    if (failures >= abortFailureLimit) {
+      try {
+        throw new IOException(failures + " failures downloading " + mapId);
+      } catch (IOException ie) {
+        reporter.reportException(ie);
+      }
+    }
+    
+    checkAndInformJobTracker(failures, mapId, readError);
+
+    checkReducerHealth();
+    
+    long delay = (long) (INITIAL_PENALTY *
+        Math.pow(PENALTY_GROWTH_RATE, failures));
+    
+    penalties.add(new Penalty(host, delay));
+    
+    failedShuffleCounter.increment(1);
+  }
+  
+  // Notify the JobTracker  
+  // after every read error, if 'reportReadErrorImmediately' is true or
+  // after every 'maxFetchFailuresBeforeReporting' failures
+  private void checkAndInformJobTracker(
+      int failures, TezTaskAttemptID mapId, boolean readError) {
+    if ((reportReadErrorImmediately && readError)
+        || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+      LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
+      status.addFailedDependency(mapId);
+    }
+  }
+    
+  private void checkReducerHealth() {
+    final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
+    final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
+    final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
+
+    long totalFailures = failedShuffleCounter.getValue();
+    int doneMaps = tasksInDegree - remainingMaps;
+    
+    boolean reducerHealthy =
+      (((float)totalFailures / (totalFailures + doneMaps))
+          < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
+    
+    // check if the reducer has progressed enough
+    boolean reducerProgressedEnough =
+      (((float)doneMaps / tasksInDegree)
+          >= MIN_REQUIRED_PROGRESS_PERCENT);
+
+    // check if the reducer is stalled for a long time
+    // duration for which the reducer is stalled
+    int stallDuration =
+      (int)(System.currentTimeMillis() - lastProgressTime);
+    
+    // duration for which the reducer ran with progress
+    int shuffleProgressDuration =
+      (int)(lastProgressTime - startTime);
+
+    // min time the reducer should run without getting killed
+    int minShuffleRunDuration =
+      (shuffleProgressDuration > maxMapRuntime)
+      ? shuffleProgressDuration
+          : maxMapRuntime;
+    
+    boolean reducerStalled =
+      (((float)stallDuration / minShuffleRunDuration)
+          >= MAX_ALLOWED_STALL_TIME_PERCENT);
+
+    // kill if not healthy and has insufficient progress
+    if ((failureCounts.size() >= maxFailedUniqueFetches ||
+        failureCounts.size() == (tasksInDegree - doneMaps))
+        && !reducerHealthy
+        && (!reducerProgressedEnough || reducerStalled)) {
+      LOG.fatal("Shuffle failed with too many fetch failures " +
+      "and insufficient progress!");
+      String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
+      reporter.reportException(new IOException(errorMsg));
+    }
+
+  }
+  
+  public synchronized void tipFailed(TezTaskID taskId) {
+    if (!finishedMaps[taskId.getId()]) {
+      finishedMaps[taskId.getId()] = true;
+      if (--remainingMaps == 0) {
+        notifyAll();
+      }
+      updateStatus();
+    }
+  }
+  
+  public synchronized void addKnownMapOutput(String hostName, 
+                                             String hostUrl,
+                                             TezTaskAttemptID mapId) {
+    MapHost host = mapLocations.get(hostName);
+    if (host == null) {
+      host = new MapHost(hostName, hostUrl);
+      mapLocations.put(hostName, host);
+    }
+    host.addKnownMap(mapId);
+
+    // Mark the host as pending
+    if (host.getState() == MapHost.State.PENDING) {
+      pendingHosts.add(host);
+      notifyAll();
+    }
+  }
+  
+  public synchronized void obsoleteMapOutput(TezTaskAttemptID mapId) {
+    obsoleteMaps.add(mapId);
+  }
+  
+  public synchronized void putBackKnownMapOutput(MapHost host, 
+                                                 TezTaskAttemptID mapId) {
+    host.addKnownMap(mapId);
+  }
+
+  public synchronized MapHost getHost() throws InterruptedException {
+      while(pendingHosts.isEmpty()) {
+        wait();
+      }
+      
+      MapHost host = null;
+      Iterator<MapHost> iter = pendingHosts.iterator();
+      int numToPick = random.nextInt(pendingHosts.size());
+      for (int i=0; i <= numToPick; ++i) {
+        host = iter.next();
+      }
+      
+      pendingHosts.remove(host);     
+      host.markBusy();
+      
+      LOG.info("Assiging " + host + " with " + host.getNumKnownMapOutputs() + 
+               " to " + Thread.currentThread().getName());
+      shuffleStart.set(System.currentTimeMillis());
+      
+      return host;
+  }
+  
+  public synchronized List<TezTaskAttemptID> getMapsForHost(MapHost host) {
+    List<TezTaskAttemptID> list = host.getAndClearKnownMaps();
+    Iterator<TezTaskAttemptID> itr = list.iterator();
+    List<TezTaskAttemptID> result = new ArrayList<TezTaskAttemptID>();
+    int includedMaps = 0;
+    int totalSize = list.size();
+    // find the maps that we still need, up to the limit
+    while (itr.hasNext()) {
+      TezTaskAttemptID id = itr.next();
+      if (!obsoleteMaps.contains(id) && !finishedMaps[id.getTaskID().getId()]) {
+        result.add(id);
+        if (++includedMaps >= MAX_MAPS_AT_ONCE) {
+          break;
+        }
+      }
+    }
+    // put back the maps left after the limit
+    while (itr.hasNext()) {
+      TezTaskAttemptID id = itr.next();
+      if (!obsoleteMaps.contains(id) && !finishedMaps[id.getTaskID().getId()]) {
+        host.addKnownMap(id);
+      }
+    }
+    LOG.info("assigned " + includedMaps + " of " + totalSize + " to " +
+             host + " to " + Thread.currentThread().getName());
+    return result;
+  }
+
+  public synchronized void freeHost(MapHost host) {
+    if (host.getState() != MapHost.State.PENALIZED) {
+      if (host.markAvailable() == MapHost.State.PENDING) {
+        pendingHosts.add(host);
+        notifyAll();
+      }
+    }
+    LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + 
+             (System.currentTimeMillis()-shuffleStart.get()) + "s");
+  }
+    
+  public synchronized void resetKnownMaps() {
+    mapLocations.clear();
+    obsoleteMaps.clear();
+    pendingHosts.clear();
+  }
+  
+  /**
+   * Wait until the shuffle finishes or until the timeout.
+   * @param millis maximum wait time
+   * @return true if the shuffle is done
+   * @throws InterruptedException
+   */
+  public synchronized boolean waitUntilDone(int millis
+                                            ) throws InterruptedException {
+    if (remainingMaps > 0) {
+      wait(millis);
+      return remainingMaps == 0;
+    }
+    return true;
+  }
+  
+  /**
+   * A structure that records the penalty for a host.
+   */
+  private static class Penalty implements Delayed {
+    MapHost host;
+    private long endTime;
+    
+    Penalty(MapHost host, long delay) {
+      this.host = host;
+      this.endTime = System.currentTimeMillis() + delay;
+    }
+
+    public long getDelay(TimeUnit unit) {
+      long remainingTime = endTime - System.currentTimeMillis();
+      return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    public int compareTo(Delayed o) {
+      long other = ((Penalty) o).endTime;
+      return endTime == other ? 0 : (endTime < other ? -1 : 1);
+    }
+    
+  }
+  
+  /**
+   * A thread that takes hosts off of the penalty list when the timer expires.
+   */
+  private class Referee extends Thread {
+    public Referee() {
+      setName("ShufflePenaltyReferee");
+      setDaemon(true);
+    }
+
+    public void run() {
+      try {
+        while (true) {
+          // take the first host that has an expired penalty
+          MapHost host = penalties.take().host;
+          synchronized (ShuffleScheduler.this) {
+            if (host.markAvailable() == MapHost.State.PENDING) {
+              pendingHosts.add(host);
+              ShuffleScheduler.this.notifyAll();
+            }
+          }
+        }
+      } catch (InterruptedException ie) {
+        return;
+      } catch (Throwable t) {
+        reporter.reportException(t);
+      }
+    }
+  }
+  
+  public void close() throws InterruptedException {
+    referee.interrupt();
+    referee.join();
+  }
+
+  public synchronized void informMaxMapRunTime(int duration) {
+    if (duration > maxMapRuntime) {
+      maxMapRuntime = duration;
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message