tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [5/9] TEZ-1479. Disambiguate (refactor) between ShuffleInputEventHandlers and Fetchers. (sseth) (cherry picked from commit 7be5830a908602ff91a07d3020f2dddf7705d48f)
Date Wed, 15 Oct 2014 18:59:41 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
new file mode 100644
index 0000000..96f98cf
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -0,0 +1,665 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
+import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
+import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+class FetcherOrderedGrouped extends Thread {
+  
+  private static final Log LOG = LogFactory.getLog(FetcherOrderedGrouped.class);
+  private final Configuration conf;
+  private final boolean localDiskFetchEnabled;
+
+  private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
+                                    CONNECTION, WRONG_REDUCE}
+  
+  private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+  private final TezCounter connectionErrs;
+  private final TezCounter ioErrs;
+  private final TezCounter wrongLengthErrs;
+  private final TezCounter badIdErrs;
+  private final TezCounter wrongMapErrs;
+  private final TezCounter wrongReduceErrs;
+  private final MergeManager merger;
+  private final ShuffleScheduler scheduler;
+  private final ShuffleClientMetrics metrics;
+  private final Shuffle shuffle;
+  private final int id;
+  private final String logIdentifier;
+  private static int nextId = 0;
+  private int currentPartition = -1;
+
+  // Decompression of map-outputs
+  private final CompressionCodec codec;
+  private final SecretKey jobTokenSecret;
+
+  @VisibleForTesting
+  volatile boolean stopped = false;
+  
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  private LinkedHashSet<InputAttemptIdentifier> remaining;
+
+  volatile HttpURLConnection connection;
+  volatile DataInputStream input;
+
+  HttpConnection httpConnection;
+  HttpConnectionParams httpConnectionParams;
+
+  final static String localhostName = NetUtils.getHostname();
+
+  // Initiative value is 0, which means it hasn't retried yet.
+  private long retryStartTime = 0;
+  
+  public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams,
+                               ShuffleScheduler scheduler, MergeManager merger,
+                               ShuffleClientMetrics metrics,
+                               Shuffle shuffle, SecretKey jobTokenSecret,
+                               boolean ifileReadAhead, int ifileReadAheadLength,
+                               CompressionCodec codec,
+                               InputContext inputContext, Configuration conf,
+                               boolean localDiskFetchEnabled) throws IOException {
+    setDaemon(true);
+    this.scheduler = scheduler;
+    this.merger = merger;
+    this.metrics = metrics;
+    this.shuffle = shuffle;
+    this.id = ++nextId;
+    this.jobTokenSecret = jobTokenSecret;
+    ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.IO_ERROR.toString());
+    wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.WRONG_LENGTH.toString());
+    badIdErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.BAD_ID.toString());
+    wrongMapErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.WRONG_MAP.toString());
+    connectionErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.CONNECTION.toString());
+    wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
+        ShuffleErrors.WRONG_REDUCE.toString());
+
+    this.ifileReadAhead = ifileReadAhead;
+    this.ifileReadAheadLength = ifileReadAheadLength;
+    this.httpConnectionParams = httpConnectionParams;
+    if (codec != null) {
+      this.codec = codec;
+    } else {
+      this.codec = null;
+    }
+    this.conf = conf;
+
+    this.localDiskFetchEnabled = localDiskFetchEnabled;
+
+    this.logIdentifier = "fetcher [" + TezUtilsInternal
+        .cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id + " " + localhostName;
+    setName(logIdentifier);
+    setDaemon(true);
+  }  
+
+  public void run() {
+    try {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        remaining = null; // Safety.
+        MapHost host = null;
+        try {
+          // If merge is on, block
+          merger.waitForInMemoryMerge();
+
+          // Get a host to shuffle from
+          host = scheduler.getHost();
+          metrics.threadBusy();
+
+          String hostPort = host.getHostIdentifier();
+          String hostname = hostPort.substring(0, hostPort.indexOf(":"));
+          if (localDiskFetchEnabled &&
+              hostname.equals(System.getenv(ApplicationConstants.Environment.NM_HOST.toString()))) {
+            setupLocalDiskFetch(host);
+          } else {
+            // Shuffle
+            copyFromHost(host);
+          }
+        } finally {
+          cleanupCurrentConnection(false);
+          if (host != null) {
+            scheduler.freeHost(host);
+            metrics.threadFree();
+          }
+        }
+      }
+    } catch (InterruptedException ie) {
+      return;
+    } catch (Throwable t) {
+      shuffle.reportException(t);
+    }
+  }
+
+  public void shutDown() throws InterruptedException {
+    this.stopped = true;
+    interrupt();
+    cleanupCurrentConnection(true);
+    try {
+      join(5000);
+    } catch (InterruptedException ie) {
+      LOG.warn("Got interrupt while joining " + getName(), ie);
+    }
+  }
+
+  private Object cleanupLock = new Object();
+  private void cleanupCurrentConnection(boolean disconnect) {
+    // Synchronizing on cleanupLock to ensure we don't run into a parallel close
+    // Can't synchronize on the main class itself since that would cause the
+    // shutdown request to block
+    synchronized (cleanupLock) {
+      try {
+        if (httpConnection != null) {
+          httpConnection.cleanup(disconnect);
+        }
+      } catch (IOException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Exception while shutting down fetcher " + logIdentifier, e);
+        } else {
+          LOG.info("Exception while shutting down fetcher " + logIdentifier + ": " + e.getMessage());
+        }
+      }
+    }
+  }
+
+  /**
+   * The crux of the matter...
+   * 
+   * @param host {@link MapHost} from which we need to  
+   *              shuffle available map-outputs.
+   */
+  @VisibleForTesting
+  protected void copyFromHost(MapHost host) throws IOException {
+    // reset retryStartTime for a new host
+    retryStartTime = 0;
+    // Get completed maps on 'host'
+    List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
+    currentPartition = host.getPartitionId();
+    
+    // Sanity check to catch hosts with only 'OBSOLETE' maps, 
+    // especially at the tail of large jobs
+    if (srcAttempts.size() == 0) {
+      return;
+    }
+    
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+        + srcAttempts + ", partitionId: " + currentPartition);
+    }
+    
+    // List of maps to be fetched yet
+    remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
+    
+    // Construct the url and connect
+    if (!setupConnection(host, srcAttempts)) {
+      if (stopped) {
+        cleanupCurrentConnection(true);
+      }
+      // Add back all remaining maps - which at this point is ALL MAPS the
+      // Fetcher was started with. The Scheduler takes care of retries,
+      // reporting too many failures etc.
+      putBackRemainingMapOutputs(host);
+      return;
+    }
+
+    try {
+      // Loop through available map-outputs and fetch them
+      // On any error, faildTasks is not null and we exit
+      // after putting back the remaining maps to the 
+      // yet_to_be_fetched list and marking the failed tasks.
+      InputAttemptIdentifier[] failedTasks = null;
+      while (!remaining.isEmpty() && failedTasks == null) {
+        // fail immediately after first failure because we dont know how much to
+        // skip for this error in the input stream. So we cannot move on to the 
+        // remaining outputs. YARN-1773. Will get to them in the next retry.
+        try {
+          failedTasks = copyMapOutput(host, input);
+        } catch (FetcherReadTimeoutException e) {
+          // Setup connection again if disconnected
+          cleanupCurrentConnection(true);
+
+          // Connect with retry
+          if (!setupConnection(host, new LinkedList<InputAttemptIdentifier>(remaining))) {
+            if (stopped) {
+              cleanupCurrentConnection(true);
+            }
+            failedTasks = new InputAttemptIdentifier[] {getNextRemainingAttempt()};
+            break;
+          }
+        }
+      }
+      
+      if(failedTasks != null && failedTasks.length > 0) {
+        LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
+        for(InputAttemptIdentifier left: failedTasks) {
+          scheduler.copyFailed(left, host, true, false);
+        }
+      }
+
+      cleanupCurrentConnection(false);
+
+      // Sanity check
+      if (failedTasks == null && !remaining.isEmpty()) {
+        throw new IOException("server didn't return all expected map outputs: "
+            + remaining.size() + " left.");
+      }
+    } finally {
+      putBackRemainingMapOutputs(host);
+    }
+  }
+
+  @VisibleForTesting
+  boolean setupConnection(MapHost host, List<InputAttemptIdentifier> attempts)
+      throws IOException {
+    boolean connectSucceeded = false;
+    try {
+      URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), attempts,
+          httpConnectionParams.getKeepAlive());
+      httpConnection = new HttpConnection(url, httpConnectionParams,
+          logIdentifier, jobTokenSecret);
+      connectSucceeded = httpConnection.connect();
+
+      if (stopped) {
+        LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
+        return false;
+      }
+      input = httpConnection.getInputStream();
+      httpConnection.validate();
+      return true;
+    } catch (IOException ie) {
+      if (stopped) {
+        LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
+        return false;
+      }
+      ioErrs.increment(1);
+      if (!connectSucceeded) {
+        LOG.warn("Failed to connect to " + host + " with " + remaining.size() + " inputs", ie);
+        connectionErrs.increment(1);
+      } else {
+        LOG.warn("Failed to verify reply after connecting to " + host + " with " + remaining.size()
+            + " inputs pending", ie);
+      }
+
+      // At this point, either the connection failed, or the initial header verification failed.
+      // The error does not relate to any specific Input. Report all of them as failed.
+      // This ends up indirectly penalizing the host (multiple failures reported on the single host)
+      for(InputAttemptIdentifier left: remaining) {
+        // Need to be handling temporary glitches ..
+        // Report read error to the AM to trigger source failure heuristics
+        scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded);
+      }
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  protected void putBackRemainingMapOutputs(MapHost host) {
+    // Cycle through remaining MapOutputs
+    boolean isFirst = true;
+    InputAttemptIdentifier first = null;
+    for (InputAttemptIdentifier left : remaining) {
+      if (isFirst) {
+        first = left;
+        isFirst = false;
+        continue;
+      }
+      scheduler.putBackKnownMapOutput(host, left);
+    }
+    if (first != null) { // Empty remaining list.
+      scheduler.putBackKnownMapOutput(host, first);
+    }
+  }
+
+  private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
+  
+  protected InputAttemptIdentifier[] copyMapOutput(MapHost host,
+                                DataInputStream input) throws FetcherReadTimeoutException {
+    MapOutput mapOutput = null;
+    InputAttemptIdentifier srcAttemptId = null;
+    long decompressedLength = -1;
+    long compressedLength = -1;
+    
+    try {
+      long startTime = System.currentTimeMillis();
+      int forReduce = -1;
+      //Read the shuffle header
+      try {
+        ShuffleHeader header = new ShuffleHeader();
+        // TODO Review: Multiple header reads in case of status WAIT ? 
+        header.readFields(input);
+        if (!header.mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
+          throw new IllegalArgumentException(
+              "Invalid header received: " + header.mapId + " partition: " + header.forReduce);
+        }
+        srcAttemptId = 
+            scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce);
+        compressedLength = header.compressedLength;
+        decompressedLength = header.uncompressedLength;
+        forReduce = header.forReduce;
+      } catch (IllegalArgumentException e) {
+        badIdErrs.increment(1);
+        LOG.warn("Invalid map id ", e);
+        // Don't know which one was bad, so consider this one bad and dont read
+        // the remaining because we dont know where to start reading from. YARN-1773
+        return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
+      }
+
+      // Do some basic sanity verification
+      if (!verifySanity(compressedLength, decompressedLength, forReduce,
+          remaining, srcAttemptId)) {
+        if (srcAttemptId == null) {
+          LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
+          srcAttemptId = getNextRemainingAttempt();
+        }
+        assert(srcAttemptId != null);
+        return new InputAttemptIdentifier[] {srcAttemptId};
+      }
+      
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + 
+            ", decomp len: " + decompressedLength);
+      }
+      
+      // Get the location for the map output - either in-memory or on-disk
+      try {
+        mapOutput = merger.reserve(srcAttemptId, decompressedLength, compressedLength, id);
+      } catch (IOException e) {
+        // Kill the reduce attempt
+        ioErrs.increment(1);
+        scheduler.reportLocalError(e);
+        return EMPTY_ATTEMPT_ID_ARRAY;
+      }
+      
+      // Check if we can shuffle *now* ...
+      if (mapOutput.getType() == Type.WAIT) {
+        // TODO Review: Does this cause a tight loop ?
+        LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
+        //Not an error but wait to process data.
+        return EMPTY_ATTEMPT_ID_ARRAY;
+      } 
+      
+      // Go!
+      LOG.info("fetcher#" + id + " about to shuffle output of map " + 
+               mapOutput.getAttemptIdentifier() + " decomp: " +
+               decompressedLength + " len: " + compressedLength);
+      if (mapOutput.getType() == Type.MEMORY) {
+        ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
+          (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
+          ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
+      } else if (mapOutput.getType() == Type.DISK) {
+        ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
+          input, compressedLength, LOG, mapOutput.getAttemptIdentifier().toString());
+      } else {
+        throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
+            mapOutput.getType());
+      }
+
+      // Inform the shuffle scheduler
+      long endTime = System.currentTimeMillis();
+      // Reset retryStartTime as map task make progress if retried before.
+      retryStartTime = 0;
+
+      scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, 
+                              endTime - startTime, mapOutput);
+      // Note successful shuffle
+      remaining.remove(srcAttemptId);
+      metrics.successFetch();
+      return null;
+    } catch (IOException ioe) {
+      if (stopped) {
+        LOG.info("Not reporting fetch failure for exception during data copy: ["
+            + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+        cleanupCurrentConnection(true);
+        if (mapOutput != null) {
+          mapOutput.abort(); // Release resources
+        }
+        // Don't need to put back - since that's handled by the invoker
+        return EMPTY_ATTEMPT_ID_ARRAY;
+      }
+      if (shouldRetry(host, ioe)) {
+        //release mem/file handles
+        if (mapOutput != null) {
+          mapOutput.abort();
+        }
+        throw new FetcherReadTimeoutException(ioe);
+      }
+      ioErrs.increment(1);
+      if (srcAttemptId == null || mapOutput == null) {
+        LOG.info("fetcher#" + id + " failed to read map header" + 
+                 srcAttemptId + " decomp: " + 
+                 decompressedLength + ", " + compressedLength, ioe);
+        if(srcAttemptId == null) {
+          return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
+        } else {
+          return new InputAttemptIdentifier[] {srcAttemptId};
+        }
+      }
+      
+      LOG.warn("Failed to shuffle output of " + srcAttemptId + 
+               " from " + host.getHostIdentifier(), ioe); 
+
+      // Inform the shuffle-scheduler
+      mapOutput.abort();
+      metrics.failedFetch();
+      return new InputAttemptIdentifier[] {srcAttemptId};
+    }
+
+  }
+
+  /**
+   * Check connection needs to be re-established.
+   *
+   * @param host
+   * @param ioe
+   * @return true to indicate connection retry. false otherwise.
+   * @throws IOException
+   */
+  private boolean shouldRetry(MapHost host, IOException ioe) {
+    if (!(ioe instanceof SocketTimeoutException)) {
+      return false;
+    }
+    // First time to retry.
+    long currentTime = System.currentTimeMillis();
+    if (retryStartTime == 0) {
+      retryStartTime = currentTime;
+    }
+
+    if (currentTime - retryStartTime < httpConnectionParams.getReadTimeout()) {
+      LOG.warn("Shuffle output from " + host.getHostIdentifier() +
+          " failed, retry it.");
+      //retry connecting to the host
+      return true;
+    } else {
+      // timeout, prepare to be failed.
+      LOG.warn("Timeout for copying MapOutput with retry on host " + host
+          + "after " + httpConnectionParams.getReadTimeout() + "milliseconds.");
+      return false;
+    }
+  }
+  
+  /**
+   * Do some basic verification on the input received -- Being defensive
+   * @param compressedLength
+   * @param decompressedLength
+   * @param forReduce
+   * @param remaining
+   * @param srcAttemptId
+   * @return true/false, based on if the verification succeeded or not
+   */
+  private boolean verifySanity(long compressedLength, long decompressedLength,
+      int forReduce, Set<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
+    if (compressedLength < 0 || decompressedLength < 0) {
+      wrongLengthErrs.increment(1);
+      LOG.warn(getName() + " invalid lengths in map output header: id: " +
+          srcAttemptId + " len: " + compressedLength + ", decomp len: " + 
+               decompressedLength);
+      return false;
+    }
+
+    // partitionId verification. Isn't availalbe here because it is encoded into
+    // URI
+    if (forReduce != currentPartition) {
+      wrongReduceErrs.increment(1);
+      LOG.warn(getName() + " data for the wrong partition map: " + srcAttemptId + " len: "
+          + compressedLength + " decomp len: " + decompressedLength + " for partition " + forReduce
+          + ", expected partition: " + currentPartition);
+      return false;
+    }
+
+    // Sanity check
+    if (!remaining.contains(srcAttemptId)) {
+      wrongMapErrs.increment(1);
+      LOG.warn("Invalid map-output! Received output for " + srcAttemptId);
+      return false;
+    }
+    
+    return true;
+  }
+  
+  private InputAttemptIdentifier getNextRemainingAttempt() {
+    if (remaining.size() > 0) {
+      return remaining.iterator().next();
+    } else {
+      return null;
+    }
+  }
+
+  @VisibleForTesting
+  protected void setupLocalDiskFetch(MapHost host) throws InterruptedException {
+    // Get completed maps on 'host'
+    List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
+    currentPartition = host.getPartitionId();
+
+    // Sanity check to catch hosts with only 'OBSOLETE' maps,
+    // especially at the tail of large jobs
+    if (srcAttempts.size() == 0) {
+      return;
+    }
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Fetcher " + id + " going to fetch (local disk) from " + host + " for: "
+          + srcAttempts + ", partitionId: " + currentPartition);
+    }
+
+    // List of maps to be fetched yet
+    remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
+
+    try {
+      final Iterator<InputAttemptIdentifier> iter = remaining.iterator();
+      while (iter.hasNext()) {
+        InputAttemptIdentifier srcAttemptId = iter.next();
+        MapOutput mapOutput = null;
+        try {
+          long startTime = System.currentTimeMillis();
+          Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null);
+
+          TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(),
+              currentPartition);
+
+          mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord);
+          long endTime = System.currentTimeMillis();
+          scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(),
+              indexRecord.getRawLength(), (endTime - startTime), mapOutput);
+          iter.remove();
+          metrics.successFetch();
+        } catch (IOException e) {
+          if (mapOutput != null) {
+            mapOutput.abort();
+          }
+          metrics.failedFetch();
+          ioErrs.increment(1);
+          scheduler.copyFailed(srcAttemptId, host, true, false);
+          LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
+              host.getHostIdentifier(), e);
+        }
+      }
+    } finally {
+      putBackRemainingMapOutputs(host);
+    }
+
+  }
+
+  @VisibleForTesting
+  protected Path getShuffleInputFileName(String pathComponent, String suffix)
+      throws IOException {
+    LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    suffix = suffix != null ? suffix : "";
+
+    String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR +
+        pathComponent + Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
+
+    return localDirAllocator.getLocalPathToRead(pathFromLocalDir.toString(), conf);
+  }
+
+  @VisibleForTesting
+  protected TezIndexRecord getIndexRecord(String pathComponent, int partitionId)
+      throws IOException {
+    Path indexFile = getShuffleInputFileName(pathComponent,
+        Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+    TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+    return spillRecord.getIndex(partitionId);
+  }
+
+  @VisibleForTesting
+  protected MapOutput getMapOutputForDirectDiskFetch(InputAttemptIdentifier srcAttemptId,
+                                                     Path filename, TezIndexRecord indexRecord)
+      throws IOException {
+    return MapOutput.createLocalDiskMapOutput(srcAttemptId, merger, filename,
+        indexRecord.getStartOffset(), indexRecord.getPartLength(), true);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
new file mode 100644
index 0000000..7ec56ec
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
+
+/**
+ * <code>IFile.InMemoryReader</code> to read map-outputs present in-memory.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InMemoryReader extends Reader {
+
+  private final InputAttemptIdentifier taskAttemptId;
+  private final MergeManager merger;
+  DataInputBuffer memDataIn = new DataInputBuffer();
+  private int start;
+  private int length;
+  private int originalKeyPos;
+
+  public InMemoryReader(MergeManager merger,
+      InputAttemptIdentifier taskAttemptId, byte[] data, int start,
+      int length)
+      throws IOException {
+    super(null, length - start, null, null, null, false, 0, -1);
+    this.merger = merger;
+    this.taskAttemptId = taskAttemptId;
+
+    buffer = data;
+    bufferSize = (int) length;
+    memDataIn.reset(buffer, start, length);
+    this.start = start;
+    this.length = length;
+  }
+
+  @Override
+  public void reset(int offset) {
+    memDataIn.reset(buffer, start + offset, length);
+    bytesRead = offset;
+    eof = false;
+  }
+
+  @Override
+  public long getPosition() throws IOException {
+    // InMemoryReader does not initialize streams like Reader, so in.getPos()
+    // would not work. Instead, return the number of uncompressed bytes read,
+    // which will be correct since in-memory data is not compressed.
+    return bytesRead;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
+  }
+
+  private void dumpOnError() {
+    File dumpFile = new File("../output/" + taskAttemptId + ".dump");
+    System.err.println("Dumping corrupt map-output of " + taskAttemptId +
+                       " to " + dumpFile.getAbsolutePath());
+    try {
+      FileOutputStream fos = new FileOutputStream(dumpFile);
+      fos.write(buffer, 0, bufferSize);
+      fos.close();
+    } catch (IOException ioe) {
+      System.err.println("Failed to dump map-output of " + taskAttemptId);
+    }
+  }
+
+  protected void readKeyValueLength(DataInput dIn) throws IOException {
+    super.readKeyValueLength(dIn);
+    if (currentKeyLength != IFile.RLE_MARKER) {
+      originalKeyPos = memDataIn.getPosition();
+    }
+  }
+
+  public KeyState readRawKey(DataInputBuffer key) throws IOException {
+    try {
+      if (!positionToNextRecord(memDataIn)) {
+        return KeyState.NO_KEY;
+      }
+      // Setup the key
+      int pos = memDataIn.getPosition();
+      byte[] data = memDataIn.getData();
+      if (currentKeyLength == IFile.RLE_MARKER) {
+        // get key length from original key
+        key.reset(data, originalKeyPos, originalKeyLength);
+        return KeyState.SAME_KEY;
+      }
+      key.reset(data, pos, currentKeyLength);
+      // Position for the next value
+      long skipped = memDataIn.skip(currentKeyLength);
+      if (skipped != currentKeyLength) {
+        throw new IOException("Rec# " + recNo +
+            ": Failed to skip past key of length: " +
+            currentKeyLength);
+      }
+      bytesRead += currentKeyLength;
+      return KeyState.NEW_KEY;
+    } catch (IOException ioe) {
+      dumpOnError();
+      throw ioe;
+    }
+  }
+
+  public void nextRawValue(DataInputBuffer value) throws IOException {
+    try {
+      int pos = memDataIn.getPosition();
+      byte[] data = memDataIn.getData();
+      value.reset(data, pos, currentValueLength);
+
+      // Position for the next record
+      long skipped = memDataIn.skip(currentValueLength);
+      if (skipped != currentValueLength) {
+        throw new IOException("Rec# " + recNo +
+            ": Failed to skip past value of length: " +
+            currentValueLength);
+      }
+      // Record the byte
+      bytesRead += currentValueLength;
+      ++recNo;
+    } catch (IOException ioe) {
+      dumpOnError();
+      throw ioe;
+    }
+  }
+
+  public void close() {
+    // Release
+    dataIn = null;
+    buffer = null;
+    // Inform the MergeManager
+    if (merger != null) {
+      merger.unreserve(bufferSize);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
new file mode 100644
index 0000000..c9896dd
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InMemoryWriter extends Writer {
+  private static final Log LOG = LogFactory.getLog(InMemoryWriter.class);
+
+  // TODO Verify and fix counters if required.
+
+  public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
+    super(null, null);
+    this.out =
+      new DataOutputStream(new IFileOutputStream(arrayStream));
+  }
+
+  public void append(Object key, Object value) throws IOException {
+    throw new UnsupportedOperationException
+    ("InMemoryWriter.append(K key, V value");
+  }
+
+  public void close() throws IOException {
+    // Write EOF_MARKER for key/value length
+    WritableUtils.writeVInt(out, IFile.EOF_MARKER);
+    WritableUtils.writeVInt(out, IFile.EOF_MARKER);
+
+    // Close the stream
+    out.close();
+    out = null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
new file mode 100644
index 0000000..211fc83
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+@Private
+class MapHost {
+  
+  public static enum State {
+    IDLE,               // No map outputs available
+    BUSY,               // Map outputs are being fetched
+    PENDING,            // Known map outputs which need to be fetched
+    PENALIZED           // Host penalized due to shuffle failures
+  }
+  
+  private State state = State.IDLE;
+  private final String hostIdentifier;
+  private final int partitionId;
+  private final String baseUrl;
+  private final String identifier;
+  // Tracks attempt IDs
+  private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>();
+  
+  public MapHost(int partitionId, String hostPort, String baseUrl) {
+    this.partitionId = partitionId;
+    this.hostIdentifier = hostPort;
+    this.baseUrl = baseUrl;
+    this.identifier = createIdentifier(hostPort, partitionId);
+  }
+  
+  public static String createIdentifier(String hostName, int partitionId) {
+    return hostName + ":" + Integer.toString(partitionId);
+  }
+  
+  public String getIdentifier() {
+    return identifier;
+  }
+  
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  public State getState() {
+    return state;
+  }
+
+  public String getHostIdentifier() {
+    return hostIdentifier;
+  }
+
+  public String getBaseUrl() {
+    return baseUrl;
+  }
+
+  public synchronized void addKnownMap(InputAttemptIdentifier srcAttempt) {
+    maps.add(srcAttempt);
+    if (state == State.IDLE) {
+      state = State.PENDING;
+    }
+  }
+
+  public synchronized List<InputAttemptIdentifier> getAndClearKnownMaps() {
+    List<InputAttemptIdentifier> currentKnownMaps = maps;
+    maps = new ArrayList<InputAttemptIdentifier>();
+    return currentKnownMaps;
+  }
+  
+  public synchronized void markBusy() {
+    state = State.BUSY;
+  }
+  
+  public synchronized void markPenalized() {
+    state = State.PENALIZED;
+  }
+  
+  public synchronized int getNumKnownMapOutputs() {
+    return maps.size();
+  }
+
+  /**
+   * Called when the node is done with its penalty or done copying.
+   * @return the host's new state
+   */
+  public synchronized State markAvailable() {
+    if (maps.isEmpty()) {
+      state = State.IDLE;
+    } else {
+      state = State.PENDING;
+    }
+    return state;
+  }
+  
+  @Override
+  public String toString() {
+    return hostIdentifier;
+  }
+  
+  /**
+   * Mark the host as penalized
+   */
+  public synchronized void penalize() {
+    state = State.PENALIZED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
new file mode 100644
index 0000000..c735a43
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Comparator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.FileChunk;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+
+
+class MapOutput {
+  private static final Log LOG = LogFactory.getLog(MapOutput.class);
+  private static AtomicInteger ID = new AtomicInteger(0);
+  
+  public static enum Type {
+    WAIT,
+    MEMORY,
+    DISK,
+    DISK_DIRECT
+  }
+
+  private final int id;
+  private final Type type;
+  private InputAttemptIdentifier attemptIdentifier;
+  private final long size;
+
+  private final boolean primaryMapOutput;
+  private final MergeManager merger;
+
+  // MEMORY
+  private final byte[] memory;
+  private BoundedByteArrayOutputStream byteStream;
+
+  // DISK
+  private final FileSystem localFS;
+  private final Path tmpOutputPath;
+  private final FileChunk outputPath;
+  private OutputStream disk;
+
+  private MapOutput(Type type, InputAttemptIdentifier attemptIdentifier, MergeManager merger,
+                    long size, Path outputPath, long offset, boolean primaryMapOutput,
+                    FileSystem fs, Path tmpOutputPath) {
+    this.id = ID.incrementAndGet();
+    this.type = type;
+    this.attemptIdentifier = attemptIdentifier;
+    this.merger = merger;
+    this.primaryMapOutput = primaryMapOutput;
+
+    this.localFS = fs;
+    this.size = size;
+
+    // Other type specific values
+
+    if (type == Type.MEMORY) {
+      // since we are passing an int from createMemoryMapOutput, its safe to cast to int
+      this.byteStream = new BoundedByteArrayOutputStream((int)size);
+      this.memory = byteStream.getBuffer();
+    } else {
+      this.byteStream = null;
+      this.memory = null;
+    }
+
+    this.tmpOutputPath = tmpOutputPath;
+    this.disk = null;
+
+    if (type == Type.DISK || type == Type.DISK_DIRECT) {
+      boolean preserve = (type == Type.DISK_DIRECT); // type disk are temp files.
+      this.outputPath = new FileChunk(outputPath, offset, size, preserve);
+    } else {
+      this.outputPath = null;
+    }
+
+  }
+
+  public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
+                                              MergeManager merger, long size, Configuration conf,
+                                              int fetcher, boolean primaryMapOutput,
+                                              TezTaskOutputFiles mapOutputFile) throws
+      IOException {
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path outputpath = mapOutputFile.getInputFileForWrite(
+        attemptIdentifier.getInputIdentifier().getInputIndex(), size);
+    Path tmpOuputPath = outputpath.suffix(String.valueOf(fetcher));
+    long offset = 0;
+
+    MapOutput mapOutput = new MapOutput(Type.DISK, attemptIdentifier, merger, size, outputpath, offset,
+        primaryMapOutput, fs, tmpOuputPath);
+    mapOutput.disk = mapOutput.localFS.create(tmpOuputPath);
+
+    return mapOutput;
+  }
+
+  public static MapOutput createLocalDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
+                                                   MergeManager merger, Path path,  long offset,
+                                                   long size, boolean primaryMapOutput)  {
+    return new MapOutput(Type.DISK_DIRECT, attemptIdentifier, merger, size, path, offset,
+        primaryMapOutput, null, null);
+  }
+
+  public static MapOutput createMemoryMapOutput(InputAttemptIdentifier attemptIdentifier,
+                                                MergeManager merger, int size,
+                                                boolean primaryMapOutput)  {
+    return new MapOutput(Type.MEMORY, attemptIdentifier, merger, size, null, -1, primaryMapOutput,
+        null, null);
+  }
+
+  public static MapOutput createWaitMapOutput(InputAttemptIdentifier attemptIdentifier) {
+    return new MapOutput(Type.WAIT, attemptIdentifier, null, -1, null, -1, false, null, null);
+  }
+
+  public boolean isPrimaryMapOutput() {
+    return primaryMapOutput;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof MapOutput) {
+      return id == ((MapOutput)obj).id;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return id;
+  }
+
+  public FileChunk getOutputPath() {
+    return outputPath;
+  }
+
+  public byte[] getMemory() {
+    return memory;
+  }
+
+  public BoundedByteArrayOutputStream getArrayStream() {
+    return byteStream;
+  }
+  
+  public OutputStream getDisk() {
+    return disk;
+  }
+
+  public InputAttemptIdentifier getAttemptIdentifier() {
+    return this.attemptIdentifier;
+  }
+
+  public Type getType() {
+    return type;
+  }
+
+  public long getSize() {
+    return size;
+  }
+
+  public void commit() throws IOException {
+    if (type == Type.MEMORY) {
+      merger.closeInMemoryFile(this);
+    } else if (type == Type.DISK) {
+      localFS.rename(tmpOutputPath, outputPath.getPath());
+      merger.closeOnDiskFile(outputPath);
+    } else if (type == Type.DISK_DIRECT) {
+      merger.closeOnDiskFile(outputPath);
+    } else {
+      throw new IOException("Cannot commit MapOutput of type WAIT!");
+    }
+  }
+  
+  public void abort() {
+    if (type == Type.MEMORY) {
+      merger.unreserve(memory.length);
+    } else if (type == Type.DISK) {
+      try {
+        localFS.delete(tmpOutputPath, false);
+      } catch (IOException ie) {
+        LOG.info("failure to clean up " + tmpOutputPath, ie);
+      }
+    } else if (type == Type.DISK_DIRECT) { //nothing to do.
+    } else {
+      throw new IllegalArgumentException
+                   ("Cannot commit MapOutput with of type WAIT!");
+    }
+  }
+  
+  public String toString() {
+    return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + type + ")";
+  }
+  
+  public static class MapOutputComparator 
+  implements Comparator<MapOutput> {
+    public int compare(MapOutput o1, MapOutput o2) {
+      if (o1.id == o2.id) { 
+        return 0;
+      }
+      
+      if (o1.size < o2.size) {
+        return -1;
+      } else if (o1.size > o2.size) {
+        return 1;
+      }
+      
+      if (o1.id < o2.id) {
+        return -1;
+      } else {
+        return 1;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
new file mode 100644
index 0000000..0db5237
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -0,0 +1,930 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.FileChunk;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.combine.Combiner;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
+
+
+/**
+ * Usage. Create instance. setInitialMemoryAvailable(long), configureAndStart()
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@SuppressWarnings(value={"rawtypes"})
+public class MergeManager {
+  
+  private static final Log LOG = LogFactory.getLog(MergeManager.class);
+
+  private final Configuration conf;
+  private final FileSystem localFS;
+  private final FileSystem rfs;
+  private final LocalDirAllocator localDirAllocator;
+  
+  private final  TezTaskOutputFiles mapOutputFile;
+  private final Progressable nullProgressable = new NullProgressable();
+  private final Combiner combiner;  
+  
+  private final Set<MapOutput> inMemoryMergedMapOutputs = 
+    new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
+  private final IntermediateMemoryToMemoryMerger memToMemMerger;
+
+  private final Set<MapOutput> inMemoryMapOutputs = 
+    new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
+  private final InMemoryMerger inMemoryMerger;
+  
+  private final Set<FileChunk> onDiskMapOutputs = new TreeSet<FileChunk>();
+  private final OnDiskMerger onDiskMerger;
+  
+  private final long memoryLimit;
+  private final int postMergeMemLimit;
+  private long usedMemory;
+  private long commitMemory;
+  private final int ioSortFactor;
+  private final long maxSingleShuffleLimit;
+  
+  private final int memToMemMergeOutputsThreshold; 
+  private final long mergeThreshold;
+  
+  private final long initialMemoryAvailable;
+
+  private final ExceptionReporter exceptionReporter;
+  
+  private final InputContext inputContext;
+
+  private final TezCounter spilledRecordsCounter;
+
+  private final TezCounter reduceCombineInputCounter;
+
+  private final TezCounter mergedMapOutputsCounter;
+  
+  private final TezCounter numMemToDiskMerges;
+  private final TezCounter numDiskToDiskMerges;
+  private final TezCounter additionalBytesWritten;
+  private final TezCounter additionalBytesRead;
+  
+  private final CompressionCodec codec;
+  
+  private volatile boolean finalMergeComplete = false;
+  
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  private final int ifileBufferSize;
+
+
+  /**
+   * Construct the MergeManager. Must call start before it becomes usable.
+   */
+  public MergeManager(Configuration conf, 
+                      FileSystem localFS,
+                      LocalDirAllocator localDirAllocator,  
+                      InputContext inputContext,
+                      Combiner combiner,
+                      TezCounter spilledRecordsCounter,
+                      TezCounter reduceCombineInputCounter,
+                      TezCounter mergedMapOutputsCounter,
+                      ExceptionReporter exceptionReporter,
+                      long initialMemoryAvailable,
+                      CompressionCodec codec,
+                      boolean ifileReadAheadEnabled,
+                      int ifileReadAheadLength) {
+    this.inputContext = inputContext;
+    this.conf = conf;
+    this.localDirAllocator = localDirAllocator;
+    this.exceptionReporter = exceptionReporter;
+    this.initialMemoryAvailable = initialMemoryAvailable;
+    
+    this.combiner = combiner;
+
+    this.reduceCombineInputCounter = reduceCombineInputCounter;
+    this.spilledRecordsCounter = spilledRecordsCounter;
+    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+    this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
+    
+    this.localFS = localFS;
+    this.rfs = ((LocalFileSystem)localFS).getRaw();
+    
+    this.numDiskToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_DISK_TO_DISK_MERGES);
+    this.numMemToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_MEM_TO_DISK_MERGES);
+    this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+    this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+
+    this.codec = codec;
+    this.ifileReadAhead = ifileReadAheadEnabled;
+    if (this.ifileReadAhead) {
+      this.ifileReadAheadLength = ifileReadAheadLength;
+    } else {
+      this.ifileReadAheadLength = 0;
+    }
+    this.ifileBufferSize = conf.getInt("io.file.buffer.size",
+        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+    
+    // Figure out initial memory req start
+    final float maxInMemCopyUse =
+      conf.getFloat(
+          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 
+          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for " +
+          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": " +
+          maxInMemCopyUse);
+    }
+
+    // Allow unit tests to fix Runtime memory
+    long memLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+        Math.min(inputContext.getTotalMemoryAvailableToTask(), Integer.MAX_VALUE)) * maxInMemCopyUse);
+
+    float maxRedPer = conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
+    if (maxRedPer > 1.0 || maxRedPer < 0.0) {
+      throw new TezUncheckedException(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT + maxRedPer);
+    }
+    // TODO maxRedBuffer should be a long.
+    int maxRedBuffer = (int) Math.min(inputContext.getTotalMemoryAvailableToTask() * maxRedPer,
+        Integer.MAX_VALUE);
+    // Figure out initial memory req end
+    
+    if (this.initialMemoryAvailable < memLimit) {
+      this.memoryLimit = this.initialMemoryAvailable;
+    } else {
+      this.memoryLimit = memLimit;
+    }
+    
+    if (this.initialMemoryAvailable < maxRedBuffer) {
+      this.postMergeMemLimit = (int) this.initialMemoryAvailable;
+    } else {
+      this.postMergeMemLimit = maxRedBuffer;
+    }
+    
+    LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer
+        + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + "Updated to: ShuffleMem="
+        + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
+
+    this.ioSortFactor = 
+        conf.getInt(
+            TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 
+            TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
+    
+    final float singleShuffleMemoryLimitPercent =
+        conf.getFloat(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
+    if (singleShuffleMemoryLimitPercent <= 0.0f
+        || singleShuffleMemoryLimitPercent > 1.0f) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+          + singleShuffleMemoryLimitPercent);
+    }
+
+    this.maxSingleShuffleLimit = 
+      (long)(memoryLimit * singleShuffleMemoryLimitPercent);
+    this.memToMemMergeOutputsThreshold = 
+            conf.getInt(
+                TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 
+                ioSortFactor);
+    this.mergeThreshold = 
+        (long)(this.memoryLimit * 
+               conf.getFloat(
+                   TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 
+                   TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT));
+    LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
+             "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
+             "mergeThreshold=" + mergeThreshold + ", " + 
+             "ioSortFactor=" + ioSortFactor + ", " +
+             "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
+    
+    if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
+      throw new RuntimeException("Invlaid configuration: "
+          + "maxSingleShuffleLimit should be less than mergeThreshold"
+          + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
+          + "mergeThreshold: " + this.mergeThreshold);
+    }
+    
+    boolean allowMemToMemMerge = 
+        conf.getBoolean(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, 
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM_DEFAULT);
+      if (allowMemToMemMerge) {
+        this.memToMemMerger = 
+          new IntermediateMemoryToMemoryMerger(this,
+                                               memToMemMergeOutputsThreshold);
+      } else {
+        this.memToMemMerger = null;
+      }
+      
+      this.inMemoryMerger = new InMemoryMerger(this);
+      
+      this.onDiskMerger = new OnDiskMerger(this);
+  }
+
+  @Private
+  void configureAndStart() {
+    if (this.memToMemMerger != null) {
+      memToMemMerger.start();
+    }
+    this.inMemoryMerger.start();
+    this.onDiskMerger.start();
+  }
+
+  /**
+   * Exposing this to get an initial memory ask without instantiating the object.
+   */
+  @Private
+  static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
+    final float maxInMemCopyUse =
+        conf.getFloat(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
+      if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+        throw new IllegalArgumentException("Invalid value for " +
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": " +
+            maxInMemCopyUse);
+      }
+
+      // Allow unit tests to fix Runtime memory
+      long memLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+          Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+      
+      LOG.info("Initial Shuffle Memory Required: " + memLimit + ", based on INPUT_BUFFER_factor: " + maxInMemCopyUse);
+
+      float maxRedPer = conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT,
+          TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
+      if (maxRedPer > 1.0 || maxRedPer < 0.0) {
+        throw new TezUncheckedException(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT + maxRedPer);
+      }
+      // TODO maxRedBuffer should be a long.
+      int maxRedBuffer = (int) Math.min(maxAvailableTaskMemory * maxRedPer,
+          Integer.MAX_VALUE);
+      LOG.info("Initial Memory required for final merged output: " + maxRedBuffer + ", using factor: " + maxRedPer);
+
+      long reqMem = Math.max(maxRedBuffer, memLimit);
+      return reqMem;
+  }
+
+  public void waitForInMemoryMerge() throws InterruptedException {
+    inMemoryMerger.waitForMerge();
+  }
+  
+  private boolean canShuffleToMemory(long requestedSize) {
+    return (requestedSize < maxSingleShuffleLimit);
+  }
+
+  final private MapOutput stallShuffle = MapOutput.createWaitMapOutput(null);
+
+  public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier, 
+                                             long requestedSize,
+                                             long compressedLength,
+                                             int fetcher
+                                             ) throws IOException {
+    if (!canShuffleToMemory(requestedSize)) {
+      LOG.info(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize + 
+               " is greater than maxSingleShuffleLimit (" + 
+               maxSingleShuffleLimit + ")");
+      return MapOutput.createDiskMapOutput(srcAttemptIdentifier, this, compressedLength, conf,
+          fetcher, true, mapOutputFile);
+    }
+    
+    // Stall shuffle if we are above the memory limit
+
+    // It is possible that all threads could just be stalling and not make
+    // progress at all. This could happen when:
+    //
+    // requested size is causing the used memory to go above limit &&
+    // requested size < singleShuffleLimit &&
+    // current used size < mergeThreshold (merge will not get triggered)
+    //
+    // To avoid this from happening, we allow exactly one thread to go past
+    // the memory limit. We check (usedMemory > memoryLimit) and not
+    // (usedMemory + requestedSize > memoryLimit). When this thread is done
+    // fetching, this will automatically trigger a merge thereby unlocking
+    // all the stalled threads
+    
+    if (usedMemory > memoryLimit) {
+      LOG.debug(srcAttemptIdentifier + ": Stalling shuffle since usedMemory (" + usedMemory
+          + ") is greater than memoryLimit (" + memoryLimit + ")." + 
+          " CommitMemory is (" + commitMemory + ")"); 
+      return stallShuffle;
+    }
+    
+    // Allow the in-memory shuffle to progress
+    LOG.debug(srcAttemptIdentifier + ": Proceeding with shuffle since usedMemory ("
+        + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
+        + "CommitMemory is (" + commitMemory + ")"); 
+    return unconditionalReserve(srcAttemptIdentifier, requestedSize, true);
+  }
+  
+  /**
+   * Unconditional Reserve is used by the Memory-to-Memory thread
+   */
+  private synchronized MapOutput unconditionalReserve(
+      InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) throws
+      IOException {
+    usedMemory += requestedSize;
+    return MapOutput.createMemoryMapOutput(srcAttemptIdentifier, this, (int)requestedSize,
+        primaryMapOutput);
+  }
+  
+  synchronized void unreserve(long size) {
+    commitMemory -= size;
+    usedMemory -= size;
+  }
+
+  public synchronized void closeInMemoryFile(MapOutput mapOutput) { 
+    inMemoryMapOutputs.add(mapOutput);
+    LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
+        + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
+        + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
+
+    commitMemory+= mapOutput.getSize();
+
+    synchronized (inMemoryMerger) {
+      // Can hang if mergeThreshold is really low.
+      // TODO Can avoid spilling in case total input size is between
+      // mergeTghreshold and total available size.
+      if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
+        LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
+            commitMemory + " > mergeThreshold=" + mergeThreshold + 
+            ". Current usedMemory=" + usedMemory);
+        inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
+        inMemoryMergedMapOutputs.clear();
+        inMemoryMerger.startMerge(inMemoryMapOutputs);
+      } 
+    }
+
+    // This should likely run a Combiner.
+    if (memToMemMerger != null) {
+      synchronized (memToMemMerger) {
+        if (!memToMemMerger.isInProgress() && 
+            inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
+          memToMemMerger.startMerge(inMemoryMapOutputs);
+        }
+      }
+    }
+  }
+  
+  
+  public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
+    inMemoryMergedMapOutputs.add(mapOutput);
+    LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + 
+             ", inMemoryMergedMapOutputs.size() -> " + 
+             inMemoryMergedMapOutputs.size());
+  }
+  
+  public synchronized void closeOnDiskFile(FileChunk file) {
+    onDiskMapOutputs.add(file);
+
+    synchronized (onDiskMerger) {
+      if (!onDiskMerger.isInProgress() &&
+          onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
+        onDiskMerger.startMerge(onDiskMapOutputs);
+      }
+    }
+  }
+
+  /**
+   * Should <b>only</b> be used after the Shuffle phaze is complete, otherwise can
+   * return an invalid state since a merge may not be in progress dur to
+   * inadequate inputs
+   * 
+   * @return true if the merge process is complete, otherwise false
+   */
+  @Private
+  public boolean isMergeComplete() {
+    return finalMergeComplete;
+  }
+  
+  public TezRawKeyValueIterator close() throws Throwable {
+    // Wait for on-going merges to complete
+    if (memToMemMerger != null) { 
+      memToMemMerger.close();
+    }
+    inMemoryMerger.close();
+    onDiskMerger.close();
+    
+    List<MapOutput> memory = 
+      new ArrayList<MapOutput>(inMemoryMergedMapOutputs);
+    inMemoryMergedMapOutputs.clear();
+    memory.addAll(inMemoryMapOutputs);
+    inMemoryMapOutputs.clear();
+    List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs);
+    onDiskMapOutputs.clear();
+    TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
+    this.finalMergeComplete = true;
+    return kvIter;
+  }
+   
+  void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
+      throws IOException, InterruptedException {
+    combiner.combine(kvIter, writer);
+  }
+
+  /**
+   * Merges multiple in-memory segment to another in-memory segment
+   */
+  private class IntermediateMemoryToMemoryMerger 
+  extends MergeThread<MapOutput> {
+    
+    public IntermediateMemoryToMemoryMerger(MergeManager manager, 
+                                            int mergeFactor) {
+      super(manager, mergeFactor, exceptionReporter);
+      setName("MemToMemMerger [" + TezUtilsInternal
+          .cleanVertexName(inputContext.getSourceVertexName()) + "]");
+      setDaemon(true);
+    }
+
+    @Override
+    public void merge(List<MapOutput> inputs) throws IOException {
+      if (inputs == null || inputs.size() == 0) {
+        return;
+      }
+
+      InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier(); 
+      List<Segment> inMemorySegments = new ArrayList<Segment>();
+      long mergeOutputSize = 
+        createInMemorySegments(inputs, inMemorySegments, 0);
+      int noInMemorySegments = inMemorySegments.size();
+      
+      MapOutput mergedMapOutputs = 
+        unconditionalReserve(dummyMapId, mergeOutputSize, false);
+      
+      Writer writer = 
+        new InMemoryWriter(mergedMapOutputs.getArrayStream());
+
+      LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
+               " segments of total-size: " + mergeOutputSize);
+
+      // Nothing will be materialized to disk because the sort factor is being
+      // set to the number of in memory segments.
+      // TODO Is this doing any combination ?
+      TezRawKeyValueIterator rIter = 
+        TezMerger.merge(conf, rfs,
+                       ConfigUtils.getIntermediateInputKeyClass(conf),
+                       ConfigUtils.getIntermediateInputValueClass(conf),
+                       inMemorySegments, inMemorySegments.size(),
+                       new Path(inputContext.getUniqueIdentifier()),
+                       (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
+                       nullProgressable, null, null, null, null); 
+      TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+      writer.close();
+
+      LOG.info(inputContext.getUniqueIdentifier() +  
+               " Memory-to-Memory merge of the " + noInMemorySegments +
+               " files in-memory complete.");
+
+      // Note the output of the merge
+      closeInMemoryMergedFile(mergedMapOutputs);
+    }
+  }
+  
+  /**
+   * Merges multiple in-memory segment to a disk segment
+   */
+  private class InMemoryMerger extends MergeThread<MapOutput> {
+    
+    public InMemoryMerger(MergeManager manager) {
+      super(manager, Integer.MAX_VALUE, exceptionReporter);
+      setName("MemtoDiskMerger [" + TezUtilsInternal
+          .cleanVertexName(inputContext.getSourceVertexName()) + "]");
+      setDaemon(true);
+    }
+    
+    @Override
+    public void merge(List<MapOutput> inputs) throws IOException, InterruptedException {
+      if (inputs == null || inputs.size() == 0) {
+        return;
+      }
+      
+      numMemToDiskMerges.increment(1);
+      
+      //name this output file same as the name of the first file that is 
+      //there in the current list of inmem files (this is guaranteed to
+      //be absent on the disk currently. So we don't overwrite a prev. 
+      //created spill). Also we need to create the output file now since
+      //it is not guaranteed that this file will be present after merge
+      //is called (we delete empty files as soon as we see them
+      //in the merge method)
+
+      //figure out the mapId 
+      InputAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
+
+      List<Segment> inMemorySegments = new ArrayList<Segment>();
+      long mergeOutputSize = 
+        createInMemorySegments(inputs, inMemorySegments,0);
+      int noInMemorySegments = inMemorySegments.size();
+
+      // TODO Maybe track serialized vs deserialized bytes.
+      
+      // All disk writes done by this merge are overhead - due to the lac of
+      // adequate memory to keep all segments in memory.
+      Path outputPath = mapOutputFile.getInputFileForWrite(
+          srcTaskIdentifier.getInputIdentifier().getInputIndex(),
+          mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
+
+      Writer writer = null;
+      long outFileLen = 0;
+      try {
+        writer =
+            new Writer(conf, rfs, outputPath,
+                (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+                (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+                codec, null, null);
+
+        TezRawKeyValueIterator rIter = null;
+        LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
+            " segments...");
+
+        // Nothing actually materialized to disk - controlled by setting sort-factor to #segments.
+        rIter = TezMerger.merge(conf, rfs,
+            (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+            (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+            inMemorySegments, inMemorySegments.size(),
+            new Path(inputContext.getUniqueIdentifier()),
+            (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
+            nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null);
+        // spilledRecordsCounter is tracking the number of keys that will be
+        // read from each of the segments being merged - which is essentially
+        // what will be written to disk.
+
+        if (null == combiner) {
+          TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+        } else {
+          // TODO Counters for Combine
+          runCombineProcessor(rIter, writer);
+        }
+        writer.close();
+        additionalBytesWritten.increment(writer.getCompressedLength());
+        writer = null;
+
+        outFileLen = localFS.getFileStatus(outputPath).getLen();
+        LOG.info(inputContext.getUniqueIdentifier() +
+            " Merge of the " + noInMemorySegments +
+            " files in-memory complete." +
+            " Local file is " + outputPath + " of size " +
+            outFileLen);
+      } catch (IOException e) { 
+        //make sure that we delete the ondisk file that we created 
+        //earlier when we invoked cloneFileAttributes
+        localFS.delete(outputPath, true);
+        throw e;
+      } finally {
+        if (writer != null) {
+          writer.close();
+        }
+      }
+
+      // Note the output of the merge
+      closeOnDiskFile(new FileChunk(outputPath, 0, outFileLen, false));
+    }
+
+  }
+
+  /**
+   * Merges multiple on-disk segments
+   */
+  private class OnDiskMerger extends MergeThread<FileChunk> {
+
+    public OnDiskMerger(MergeManager manager) {
+      super(manager, ioSortFactor, exceptionReporter);
+      setName("DiskToDiskMerger [" + TezUtilsInternal
+          .cleanVertexName(inputContext.getSourceVertexName()) + "]");
+      setDaemon(true);
+    }
+    
+    @Override
+    public void merge(List<FileChunk> inputs) throws IOException {
+      // sanity check
+      if (inputs == null || inputs.isEmpty()) {
+        LOG.info("No ondisk files to merge...");
+        return;
+      }
+      numDiskToDiskMerges.increment(1);
+      
+      long approxOutputSize = 0;
+      int bytesPerSum = 
+        conf.getInt("io.bytes.per.checksum", 512);
+      
+      LOG.info("OnDiskMerger: We have  " + inputs.size() + 
+               " map outputs on disk. Triggering merge...");
+
+      List<Segment> inputSegments = new ArrayList<Segment>(inputs.size());
+
+      // 1. Prepare the list of files to be merged.
+      for (FileChunk fileChunk : inputs) {
+        final long offset = fileChunk.getOffset();
+        final long size = fileChunk.getLength();
+        final boolean preserve = fileChunk.preserveAfterUse();
+        final Path file = fileChunk.getPath();
+        approxOutputSize += size;
+        Segment segment = new Segment(conf, rfs, file, offset, size, codec, ifileReadAhead,
+            ifileReadAheadLength, ifileBufferSize, preserve);
+        inputSegments.add(segment);
+      }
+
+      // add the checksum length
+      approxOutputSize += 
+        ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
+
+      // 2. Start the on-disk merge process
+      Path outputPath = 
+        localDirAllocator.getLocalPathForWrite(inputs.get(0).getPath().toString(),
+            approxOutputSize, conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
+      Writer writer = 
+        new Writer(conf, rfs, outputPath, 
+                        (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
+                        (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+                        codec, null, null);
+      Path tmpDir = new Path(inputContext.getUniqueIdentifier());
+      try {
+        TezRawKeyValueIterator iter = TezMerger.merge(conf, rfs,
+            (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+            (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+            inputSegments,
+            ioSortFactor, tmpDir,
+            (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
+            nullProgressable, true, spilledRecordsCounter, null,
+            mergedMapOutputsCounter, null);
+
+        // TODO Maybe differentiate between data written because of Merges and
+        // the finalMerge (i.e. final mem available may be different from
+        // initial merge mem)
+        TezMerger.writeFile(iter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+        writer.close();
+        additionalBytesWritten.increment(writer.getCompressedLength());
+      } catch (IOException e) {
+        localFS.delete(outputPath, true);
+        throw e;
+      }
+
+      final long outputLen = localFS.getFileStatus(outputPath).getLen();
+      closeOnDiskFile(new FileChunk(outputPath, 0, outputLen, false));
+
+      LOG.info(inputContext.getUniqueIdentifier() +
+          " Finished merging " + inputs.size() + 
+          " map output files on disk of total-size " + 
+          approxOutputSize + "." + 
+          " Local output file is " + outputPath + " of size " +
+          outputLen);
+    }
+  }
+  
+  private long createInMemorySegments(List<MapOutput> inMemoryMapOutputs,
+                                      List<Segment> inMemorySegments, 
+                                      long leaveBytes
+                                      ) throws IOException {
+    long totalSize = 0L;
+    // We could use fullSize could come from the RamManager, but files can be
+    // closed but not yet present in inMemoryMapOutputs
+    long fullSize = 0L;
+    for (MapOutput mo : inMemoryMapOutputs) {
+      fullSize += mo.getMemory().length;
+    }
+    while(fullSize > leaveBytes) {
+      MapOutput mo = inMemoryMapOutputs.remove(0);
+      byte[] data = mo.getMemory();
+      long size = data.length;
+      totalSize += size;
+      fullSize -= size;
+      IFile.Reader reader = new InMemoryReader(MergeManager.this, 
+                                                   mo.getAttemptIdentifier(),
+                                                   data, 0, (int)size);
+      inMemorySegments.add(new Segment(reader, true, 
+                                            (mo.isPrimaryMapOutput() ? 
+                                            mergedMapOutputsCounter : null)));
+    }
+    return totalSize;
+  }
+
+  class RawKVIteratorReader extends IFile.Reader {
+
+    private final TezRawKeyValueIterator kvIter;
+
+    public RawKVIteratorReader(TezRawKeyValueIterator kvIter, long size)
+        throws IOException {
+      super(null, size, null, spilledRecordsCounter, null, ifileReadAhead,
+          ifileReadAheadLength, ifileBufferSize);
+      this.kvIter = kvIter;
+    }
+    @Override
+    public KeyState readRawKey(DataInputBuffer key) throws IOException {
+      if (kvIter.next()) {
+        final DataInputBuffer kb = kvIter.getKey();
+        final int kp = kb.getPosition();
+        final int klen = kb.getLength() - kp;
+        key.reset(kb.getData(), kp, klen);
+        bytesRead += klen;
+        return KeyState.NEW_KEY;
+      }
+      return KeyState.NO_KEY;
+    }
+    public void nextRawValue(DataInputBuffer value) throws IOException {
+      final DataInputBuffer vb = kvIter.getValue();
+      final int vp = vb.getPosition();
+      final int vlen = vb.getLength() - vp;
+      value.reset(vb.getData(), vp, vlen);
+      bytesRead += vlen;
+    }
+    public long getPosition() throws IOException {
+      return bytesRead;
+    }
+
+    public void close() throws IOException {
+      kvIter.close();
+    }
+  }
+
+  private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
+                                       List<MapOutput> inMemoryMapOutputs,
+                                       List<FileChunk> onDiskMapOutputs
+                                       ) throws IOException {
+    LOG.info("finalMerge called with " + 
+             inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
+             onDiskMapOutputs.size() + " on-disk map-outputs");
+    
+    
+    
+
+    // merge config params
+    Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
+    Class valueClass = (Class)ConfigUtils.getIntermediateInputValueClass(job);
+    final Path tmpDir = new Path(inputContext.getUniqueIdentifier());
+    final RawComparator comparator =
+      (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);
+
+    // segments required to vacate memory
+    List<Segment> memDiskSegments = new ArrayList<Segment>();
+    long inMemToDiskBytes = 0;
+    boolean mergePhaseFinished = false;
+    if (inMemoryMapOutputs.size() > 0) {
+      int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
+      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
+                                                memDiskSegments,
+                                                this.postMergeMemLimit);
+      final int numMemDiskSegments = memDiskSegments.size();
+      if (numMemDiskSegments > 0 &&
+            ioSortFactor > onDiskMapOutputs.size()) {
+        
+        // If we reach here, it implies that we have less than io.sort.factor
+        // disk segments and this will be incremented by 1 (result of the 
+        // memory segments merge). Since this total would still be 
+        // <= io.sort.factor, we will not do any more intermediate merges,
+        // the merge of all these disk segments would be directly fed to the
+        // reduce method
+        
+        mergePhaseFinished = true;
+        // must spill to disk, but can't retain in-mem for intermediate merge
+        final Path outputPath = 
+          mapOutputFile.getInputFileForWrite(srcTaskId,
+                                             inMemToDiskBytes).suffix(
+                                                 Constants.MERGED_OUTPUT_PREFIX);
+        final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass,
+            memDiskSegments, numMemDiskSegments, tmpDir, comparator, nullProgressable,
+            spilledRecordsCounter, null, additionalBytesRead, null);
+        final Writer writer = new Writer(job, fs, outputPath,
+            keyClass, valueClass, codec, null, null);
+        try {
+          TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+        } catch (IOException e) {
+          if (null != outputPath) {
+            try {
+              fs.delete(outputPath, true);
+            } catch (IOException ie) {
+              // NOTHING
+            }
+          }
+          throw e;
+        } finally {
+          if (null != writer) {
+            writer.close();
+            additionalBytesWritten.increment(writer.getCompressedLength());
+          }
+        }
+
+        final FileStatus fStatus = localFS.getFileStatus(outputPath);
+        // add to list of final disk outputs.
+        onDiskMapOutputs.add(new FileChunk(outputPath, 0, fStatus.getLen(), false));
+
+        LOG.info("Merged " + numMemDiskSegments + " segments, " +
+                 inMemToDiskBytes + " bytes to disk to satisfy " +
+                 "reduce memory limit");
+        inMemToDiskBytes = 0;
+        memDiskSegments.clear();
+      } else if (inMemToDiskBytes != 0) {
+        LOG.info("Keeping " + numMemDiskSegments + " segments, " +
+                 inMemToDiskBytes + " bytes in memory for " +
+                 "intermediate, on-disk merge");
+      }
+    }
+
+    // segments on disk
+    List<Segment> diskSegments = new ArrayList<Segment>();
+    long onDiskBytes = inMemToDiskBytes;
+    FileChunk[] onDisk = onDiskMapOutputs.toArray(new FileChunk[onDiskMapOutputs.size()]);
+    for (FileChunk fileChunk : onDisk) {
+      final long fileLength = fileChunk.getLength();
+      onDiskBytes += fileLength;
+      LOG.debug("Disk file: " + fileChunk.getPath() + " Length is " + fileLength);
+
+      final Path file = fileChunk.getPath();
+      TezCounter counter =
+          file.toString().endsWith(Constants.MERGED_OUTPUT_PREFIX) ? null : mergedMapOutputsCounter;
+
+      final long fileOffset = fileChunk.getOffset();
+      final boolean preserve = fileChunk.preserveAfterUse();
+      diskSegments.add(new Segment(job, fs, file, fileOffset, fileLength, codec, ifileReadAhead,
+                                   ifileReadAheadLength, ifileBufferSize, preserve, counter));
+    }
+    LOG.info("Merging " + onDisk.length + " files, " +
+             onDiskBytes + " bytes from disk");
+    Collections.sort(diskSegments, new Comparator<Segment>() {
+      public int compare(Segment o1, Segment o2) {
+        if (o1.getLength() == o2.getLength()) {
+          return 0;
+        }
+        return o1.getLength() < o2.getLength() ? -1 : 1;
+      }
+    });
+
+    // build final list of segments from merged backed by disk + in-mem
+    List<Segment> finalSegments = new ArrayList<Segment>();
+    long inMemBytes = createInMemorySegments(inMemoryMapOutputs, 
+                                             finalSegments, 0);
+    LOG.info("Merging " + finalSegments.size() + " segments, " +
+             inMemBytes + " bytes from memory into reduce");
+    if (0 != onDiskBytes) {
+      final int numInMemSegments = memDiskSegments.size();
+      diskSegments.addAll(0, memDiskSegments);
+      memDiskSegments.clear();
+      TezRawKeyValueIterator diskMerge = TezMerger.merge(
+          job, fs, keyClass, valueClass, codec, diskSegments,
+          ioSortFactor, numInMemSegments, tmpDir, comparator,
+          nullProgressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
+      diskSegments.clear();
+      if (0 == finalSegments.size()) {
+        return diskMerge;
+      }
+      finalSegments.add(new Segment(
+            new RawKVIteratorReader(diskMerge, onDiskBytes), true));
+    }
+    // This is doing nothing but creating an iterator over the segments.
+    return TezMerger.merge(job, fs, keyClass, valueClass,
+                 finalSegments, finalSegments.size(), tmpDir,
+                 comparator, nullProgressable, spilledRecordsCounter, null,
+                 additionalBytesRead, null);
+  }
+}


Mime
View raw message