tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [4/9] TEZ-1479. Disambiguate (refactor) between ShuffleInputEventHandlers and Fetchers. (sseth) (cherry picked from commit 7be5830a908602ff91a07d3020f2dddf7705d48f)
Date Wed, 15 Oct 2014 18:59:40 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
new file mode 100644
index 0000000..bc6ea9a
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+abstract class MergeThread<T> extends Thread {
+  
+  private static final Log LOG = LogFactory.getLog(MergeThread.class);
+
+  private volatile boolean inProgress = false;
+  private final List<T> inputs = new ArrayList<T>();
+  protected final MergeManager manager;
+  private final ExceptionReporter reporter;
+  private boolean closed = false;
+  private final int mergeFactor;
+  
+  public MergeThread(MergeManager manager, int mergeFactor,
+                     ExceptionReporter reporter) {
+    this.manager = manager;
+    this.mergeFactor = mergeFactor;
+    this.reporter = reporter;
+  }
+  
+  public synchronized void close() throws InterruptedException {
+    closed = true;
+    waitForMerge();
+    interrupt();
+  }
+
+  public synchronized boolean isInProgress() {
+    return inProgress;
+  }
+  
+  public synchronized void startMerge(Set<T> inputs) {
+    if (!closed) {
+      this.inputs.clear();
+      inProgress = true;
+      Iterator<T> iter=inputs.iterator();
+      for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
+        this.inputs.add(iter.next());
+        iter.remove();
+      }
+      LOG.info(getName() + ": Starting merge with " + this.inputs.size() + 
+               " segments, while ignoring " + inputs.size() + " segments");
+      notifyAll();
+    }
+  }
+
+  public synchronized void waitForMerge() throws InterruptedException {
+    while (inProgress) {
+      wait();
+    }
+  }
+
+  public void run() {
+    while (true) {
+      try {
+        // Wait for notification to start the merge...
+        synchronized (this) {
+          while (!inProgress) {
+            wait();
+          }
+        }
+
+        // Merge
+        merge(inputs);
+      } catch (InterruptedException ie) {
+        return;
+      } catch(Throwable t) {
+        reporter.reportException(t);
+        return;
+      } finally {
+        synchronized (this) {
+          // Clear inputs
+          inputs.clear();
+          inProgress = false;        
+          notifyAll();
+        }
+      }
+    }
+  }
+
+  public abstract void merge(List<T> inputs) 
+      throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
new file mode 100644
index 0000000..5e4f668
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.combine.Combiner;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
+import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
+import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Usage: Create instance, setInitialMemoryAllocated(long), run()
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Shuffle implements ExceptionReporter {
+  
+  private static final Log LOG = LogFactory.getLog(Shuffle.class);
+  private static final int PROGRESS_FREQUENCY = 2000;
+  
+  private final Configuration conf;
+  private final InputContext inputContext;
+  
+  private final ShuffleClientMetrics metrics;
+
+  private final ShuffleInputEventHandlerOrderedGrouped eventHandler;
+  private final ShuffleScheduler scheduler;
+  private final MergeManager merger;
+
+  private final SecretKey jobTokenSecret;
+  private final CompressionCodec codec;
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  private final int numFetchers;
+  private final boolean localDiskFetchEnabled;
+  
+  private Throwable throwable = null;
+  private String throwingThreadName = null;
+
+  private final RunShuffleCallable runShuffleCallable;
+  private volatile ListenableFuture<TezRawKeyValueIterator> runShuffleFuture;
+  private final ListeningExecutorService executor;
+  
+  private final String srcNameTrimmed;
+  
+  private final List<FetcherOrderedGrouped> fetchers;
+  private final HttpConnectionParams httpConnectionParams;
+  
+  private AtomicBoolean isShutDown = new AtomicBoolean(false);
+  private AtomicBoolean fetchersClosed = new AtomicBoolean(false);
+  private AtomicBoolean schedulerClosed = new AtomicBoolean(false);
+  private AtomicBoolean mergerClosed = new AtomicBoolean(false);
+
+  public Shuffle(InputContext inputContext, Configuration conf, int numInputs,
+      long initialMemoryAvailable) throws IOException {
+    this.inputContext = inputContext;
+    this.conf = conf;
+    this.httpConnectionParams =
+        ShuffleUtils.constructHttpShuffleConnectionParams(conf);
+    this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
+        inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
+        this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
+    
+    this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
+    
+    this.jobTokenSecret = ShuffleUtils
+        .getJobTokenSecretFromTokenBytes(inputContext
+            .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
+    
+    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+      Class<? extends CompressionCodec> codecClass =
+          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, conf);
+    } else {
+      codec = null;
+    }
+    this.ifileReadAhead = conf.getBoolean(
+        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+    if (this.ifileReadAhead) {
+      this.ifileReadAheadLength = conf.getInt(
+          TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+          TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+    } else {
+      this.ifileReadAheadLength = 0;
+    }
+    
+    Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext);
+    
+    FileSystem localFS = FileSystem.getLocal(this.conf);
+    LocalDirAllocator localDirAllocator = 
+        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+
+    // TODO TEZ Get rid of Map / Reduce references.
+    TezCounter shuffledInputsCounter = 
+        inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
+    TezCounter reduceShuffleBytes =
+        inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
+    TezCounter reduceDataSizeDecompressed = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
+    TezCounter failedShuffleCounter =
+        inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
+    TezCounter spilledRecordsCounter = 
+        inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
+    TezCounter reduceCombineInputCounter =
+        inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    TezCounter mergedMapOutputsCounter =
+        inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+    TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_TO_DISK);
+    TezCounter bytesShuffedToDiskDirect = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
+    TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_TO_MEM);
+    
+    LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: "
+        + (codec == null ? "None" : codec.getClass().getName()) + 
+        "ifileReadAhead: " + ifileReadAhead);
+
+    boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+      TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
+    scheduler = new ShuffleScheduler(
+          this.inputContext,
+          this.conf,
+          numInputs,
+          this,
+          shuffledInputsCounter,
+          reduceShuffleBytes,
+          reduceDataSizeDecompressed,
+          failedShuffleCounter,
+          bytesShuffedToDisk,
+          bytesShuffedToDiskDirect,
+          bytesShuffedToMem);
+
+    merger = new MergeManager(
+          this.conf,
+          localFS,
+          localDirAllocator,
+          inputContext,
+          combiner,
+          spilledRecordsCounter,
+          reduceCombineInputCounter,
+          mergedMapOutputsCounter,
+          this,
+          initialMemoryAvailable,
+          codec,
+          ifileReadAhead,
+          ifileReadAheadLength);
+
+    eventHandler= new ShuffleInputEventHandlerOrderedGrouped(
+        inputContext,
+        scheduler,
+        sslShuffle);
+    
+    ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + srcNameTrimmed + "]").build());
+
+    int configuredNumFetchers = 
+        conf.getInt(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
+    numFetchers = Math.min(configuredNumFetchers, numInputs);
+    LOG.info("Num fetchers being started: " + numFetchers);
+    fetchers = Lists.newArrayListWithCapacity(numFetchers);
+    localDiskFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
+        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
+
+    executor = MoreExecutors.listeningDecorator(rawExecutor);
+    runShuffleCallable = new RunShuffleCallable();
+  }
+
+  public void handleEvents(List<Event> events) throws IOException {
+    if (!isShutDown.get()) {
+      eventHandler.handleEvents(events);
+    } else {
+      LOG.info("Ignoring events since already shutdown. EventCount: " + events.size());
+    }
+
+  }
+  
+  /**
+   * Indicates whether the Shuffle and Merge processing is complete.
+   * @return false if not complete, true if complete or if an error occurred.
+   * @throws InterruptedException 
+   * @throws IOException 
+   * @throws InputAlreadyClosedException 
+   */
+  // ZZZ Deal with these methods.
+  public boolean isInputReady() throws IOException, InterruptedException, TezException {
+    if (isShutDown.get()) {
+      throw new InputAlreadyClosedException();
+    }
+    if (throwable != null) {
+      handleThrowable(throwable);
+    }
+    if (runShuffleFuture == null) {
+      return false;
+    }
+    // Don't need to check merge status, since runShuffleFuture will only
+    // complete once merge is complete.
+    return runShuffleFuture.isDone();
+  }
+
+  private void handleThrowable(Throwable t) throws IOException, InterruptedException {
+    if (t instanceof IOException) {
+      throw (IOException) t;
+    } else if (t instanceof InterruptedException) {
+      throw (InterruptedException) t;
+    } else {
+      throw new UndeclaredThrowableException(t);
+    }
+  }
+
+  /**
+   * Waits for the Shuffle and Merge to complete, and returns an iterator over the input.
+   * @return an iterator over the fetched input.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  // ZZZ Deal with these methods.
+  public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException,
+      TezException {
+    Preconditions.checkState(runShuffleFuture != null,
+        "waitForInput can only be called after run");
+    TezRawKeyValueIterator kvIter = null;
+    try {
+      kvIter = runShuffleFuture.get();
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      handleThrowable(cause);
+    }
+    if (isShutDown.get()) {
+      throw new InputAlreadyClosedException();
+    }
+    if (throwable != null) {
+      handleThrowable(throwable);
+    }
+    return kvIter;
+  }
+
+  public void run() throws IOException {
+    merger.configureAndStart();
+    runShuffleFuture = executor.submit(runShuffleCallable);
+    Futures.addCallback(runShuffleFuture, new ShuffleRunnerFutureCallback());
+    executor.shutdown();
+  }
+
+  public void shutdown() {
+    if (!isShutDown.getAndSet(true)) {
+      // Interrupt so that the scheduler / merger sees this interrupt.
+      LOG.info("Shutting down Shuffle for source: " + srcNameTrimmed);
+      runShuffleFuture.cancel(true);
+      cleanupIgnoreErrors();
+    }
+  }
+
+  private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
+    @Override
+    public TezRawKeyValueIterator call() throws IOException, InterruptedException {
+
+      synchronized (this) {
+        for (int i = 0; i < numFetchers; ++i) {
+          FetcherOrderedGrouped
+              fetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger,
+            metrics, Shuffle.this, jobTokenSecret, ifileReadAhead, ifileReadAheadLength,
+            codec, inputContext, conf, localDiskFetchEnabled);
+          fetchers.add(fetcher);
+          fetcher.start();
+        }
+      }
+
+      
+      while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
+        synchronized (this) {
+          if (throwable != null) {
+            throw new ShuffleError("error in shuffle in " + throwingThreadName,
+                                   throwable);
+          }
+        }
+      }
+      
+      // Stop the map-output fetcher threads
+      cleanupFetchers(false);
+      
+      // stop the scheduler
+      cleanupShuffleScheduler(false);
+
+      // Finish the on-going merges...
+      TezRawKeyValueIterator kvIter = null;
+      try {
+        kvIter = merger.close();
+      } catch (Throwable e) {
+        throw new ShuffleError("Error while doing final merge " , e);
+      }
+      
+      // Sanity check
+      synchronized (Shuffle.this) {
+        if (throwable != null) {
+          throw new ShuffleError("error in shuffle in " + throwingThreadName,
+                                 throwable);
+        }
+      }
+
+      inputContext.inputIsReady();
+      LOG.info("merge complete for input vertex : " + inputContext.getSourceVertexName());
+      return kvIter;
+    }
+  }
+  
+  private synchronized void cleanupFetchers(boolean ignoreErrors) throws InterruptedException {
+    // Stop the fetcher threads
+    InterruptedException ie = null;
+    if (!fetchersClosed.getAndSet(true)) {
+      for (FetcherOrderedGrouped fetcher : fetchers) {
+        try {
+          fetcher.shutDown();
+        } catch (InterruptedException e) {
+          if (ignoreErrors) {
+            LOG.info("Interrupted while shutting down fetchers. Ignoring.");
+          } else {
+            if (ie != null) {
+              ie = e;
+            } else {
+              LOG.warn("Ignoring exception while shutting down fetcher since a previous one was seen and will be thrown "
+                  + e);
+            }
+          }
+        }
+      }
+      fetchers.clear();
+      //All threads are shutdown.  It is safe to shutdown SSL factory
+      if (httpConnectionParams.isSSLShuffleEnabled()) {
+        HttpConnection.cleanupSSLFactory();
+      }
+      // throw only the first exception while attempting to shutdown.
+      if (ie != null) {
+        throw ie;
+      }
+    }
+  }
+
+  private void cleanupShuffleScheduler(boolean ignoreErrors) throws InterruptedException {
+
+    if (!schedulerClosed.getAndSet(true)) {
+      try {
+        scheduler.close();
+      } catch (InterruptedException e) {
+        if (ignoreErrors) {
+          LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring");
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+
+  private void cleanupMerger(boolean ignoreErrors) throws Throwable {
+    if (!mergerClosed.getAndSet(true)) {
+      try {
+        merger.close();
+      } catch (Throwable e) {
+        if (ignoreErrors) {
+          LOG.info("Exception while trying to shutdown merger, Ignoring", e);
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+
+  private void cleanupIgnoreErrors() {
+    try {
+      cleanupFetchers(true);
+      cleanupShuffleScheduler(true);
+      cleanupMerger(true);
+    } catch (Throwable t) {
+      // Ignore
+    }
+  }
+
+  @Private
+  public synchronized void reportException(Throwable t) {
+    if (throwable == null) {
+      throwable = t;
+      throwingThreadName = Thread.currentThread().getName();
+      // Notify the scheduler so that the reporting thread finds the 
+      // exception immediately.
+      synchronized (scheduler) {
+        scheduler.notifyAll();
+      }
+    }
+  }
+  
+  public static class ShuffleError extends IOException {
+    private static final long serialVersionUID = 5753909320586607881L;
+
+    ShuffleError(String msg, Throwable t) {
+      super(msg, t);
+    }
+  }
+
+  @Private
+  public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
+    return MergeManager.getInitialMemoryRequirement(conf, maxAvailableTaskMemory);
+  }
+  
+  private class ShuffleRunnerFutureCallback implements FutureCallback<TezRawKeyValueIterator> {
+    @Override
+    public void onSuccess(TezRawKeyValueIterator result) {
+      LOG.info("Shuffle Runner thread complete");
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      // ZZZ Handle failures during shutdown.
+      if (isShutDown.get()) {
+        LOG.info("Already shutdown. Ignoring error: ",  t);
+      } else {
+        LOG.error("ShuffleRunner failed with error", t);
+        inputContext.fatalError(t, "Shuffle Runner Failed");
+        cleanupIgnoreErrors();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
new file mode 100644
index 0000000..f297dad
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+
+class ShuffleClientMetrics implements Updater {
+
+  private MetricsRecord shuffleMetrics = null;
+  private int numFailedFetches = 0;
+  private int numSuccessFetches = 0;
+  private long numBytes = 0;
+  private int numThreadsBusy = 0;
+  private final int numCopiers;
+  
+  ShuffleClientMetrics(String dagName, String vertexName, int taskIndex, Configuration conf, 
+      String user) {
+    this.numCopiers = 
+        conf.getInt(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
+
+    MetricsContext metricsContext = MetricsUtil.getContext(Constants.TEZ);
+    this.shuffleMetrics = 
+      MetricsUtil.createRecord(metricsContext, "shuffleInput");
+    this.shuffleMetrics.setTag("user", user);
+    this.shuffleMetrics.setTag("dagName", dagName);
+    this.shuffleMetrics.setTag("taskId", TezRuntimeUtils.getTaskIdentifier(vertexName, taskIndex));
+    this.shuffleMetrics.setTag("sessionId", 
+        conf.get(
+            TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID,
+            TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT));
+    metricsContext.registerUpdater(this);
+  }
+  public synchronized void inputBytes(long numBytes) {
+    this.numBytes += numBytes;
+  }
+  public synchronized void failedFetch() {
+    ++numFailedFetches;
+  }
+  public synchronized void successFetch() {
+    ++numSuccessFetches;
+  }
+  public synchronized void threadBusy() {
+    ++numThreadsBusy;
+  }
+  public synchronized void threadFree() {
+    --numThreadsBusy;
+  }
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
+      shuffleMetrics.incrMetric("shuffle_failed_fetches", 
+                                numFailedFetches);
+      shuffleMetrics.incrMetric("shuffle_success_fetches", 
+                                numSuccessFetches);
+      if (numCopiers != 0) {
+        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
+            100*((float)numThreadsBusy/numCopiers));
+      } else {
+        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
+      }
+      numBytes = 0;
+      numSuccessFetches = 0;
+      numFailedFetches = 0;
+    }
+    shuffleMetrics.update();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java
new file mode 100644
index 0000000..339af57
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Shuffle Header information that is sent by the TaskTracker and 
+ * deciphered by the Fetcher thread of Reduce task
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class ShuffleHeader implements Writable {
+  
+  /** Header info of the shuffle http request/response */
+  public static final String HTTP_HEADER_NAME = "name";
+  public static final String DEFAULT_HTTP_HEADER_NAME = "mapreduce";
+  public static final String HTTP_HEADER_VERSION = "version";
+  public static final String DEFAULT_HTTP_HEADER_VERSION = "1.0.0";
+
+  /**
+   * The longest possible length of task attempt id that we will accept.
+   */
+  private static final int MAX_ID_LENGTH = 1000;
+
+  String mapId;
+  long uncompressedLength;
+  long compressedLength;
+  int forReduce;
+  
+  public ShuffleHeader() { }
+  
+  public ShuffleHeader(String mapId, long compressedLength,
+      long uncompressedLength, int forReduce) {
+    this.mapId = mapId;
+    this.compressedLength = compressedLength;
+    this.uncompressedLength = uncompressedLength;
+    this.forReduce = forReduce;
+  }
+  
+  public String getMapId() {
+    return this.mapId;
+  }
+  
+  public int getPartition() {
+    return this.forReduce;
+  }
+  
+  public long getUncompressedLength() {
+    return uncompressedLength;
+  }
+
+  public long getCompressedLength() {
+    return compressedLength;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    mapId = WritableUtils.readStringSafely(in, MAX_ID_LENGTH);
+    compressedLength = WritableUtils.readVLong(in);
+    uncompressedLength = WritableUtils.readVLong(in);
+    forReduce = WritableUtils.readVInt(in);
+  }
+
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, mapId);
+    WritableUtils.writeVLong(out, compressedLength);
+    WritableUtils.writeVLong(out, uncompressedLength);
+    WritableUtils.writeVInt(out, forReduce);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
new file mode 100644
index 0000000..2feeaed
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.BitSet;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class ShuffleInputEventHandlerOrderedGrouped {
+  
+  private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandlerOrderedGrouped.class);
+
+  private final ShuffleScheduler scheduler;
+  private final InputContext inputContext;
+
+  private int maxMapRuntime = 0;
+  private final boolean sslShuffle;
+
+  public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext,
+                                                ShuffleScheduler scheduler, boolean sslShuffle) {
+    this.inputContext = inputContext;
+    this.scheduler = scheduler;
+    this.sslShuffle = sslShuffle;
+  }
+
+  public void handleEvents(List<Event> events) throws IOException {
+    for (Event event : events) {
+      handleEvent(event);
+    }
+  }
+  
+  
+  private void handleEvent(Event event) throws IOException {
+    if (event instanceof DataMovementEvent) {
+      processDataMovementEvent((DataMovementEvent) event);      
+    } else if (event instanceof InputFailedEvent) {
+      processTaskFailedEvent((InputFailedEvent) event);
+    }
+  }
+
+  private void processDataMovementEvent(DataMovementEvent dmEvent) throws IOException {
+    DataMovementEventPayloadProto shufflePayload;
+    try {
+      shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(dmEvent.getUserPayload()));
+    } catch (InvalidProtocolBufferException e) {
+      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+    } 
+    int partitionId = dmEvent.getSourceIndex();
+    LOG.info("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex()
+        + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload));
+    // TODO NEWTEZ See if this duration hack can be removed.
+    int duration = shufflePayload.getRunDuration();
+    if (duration > maxMapRuntime) {
+      maxMapRuntime = duration;
+      scheduler.informMaxMapRunTime(maxMapRuntime);
+    }
+    if (shufflePayload.hasEmptyPartitions()) {
+      try {
+        byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
+        BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+        if (emptyPartitionsBitSet.get(partitionId)) {
+          InputAttemptIdentifier srcAttemptIdentifier =
+              new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                "Source partition: " + partitionId + " did not generate any data. SrcAttempt: ["
+                    + srcAttemptIdentifier + "]. Not fetching.");
+          }
+          scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
+          return;
+        }
+      } catch (IOException e) {
+        throw new TezUncheckedException("Unable to set " +
+                "the empty partition to succeeded", e);
+      }
+    }
+
+    InputAttemptIdentifier srcAttemptIdentifier =
+        new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(),
+            shufflePayload.getPathComponent());
+
+    URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
+    scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(),
+        partitionId, baseUri.toString(), srcAttemptIdentifier);
+  }
+  
+  private void processTaskFailedEvent(InputFailedEvent ifEvent) {
+    InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
+    scheduler.obsoleteInput(taIdentifier);
+    LOG.info("Obsoleting output of src-task: " + taIdentifier);
+  }
+
+  // TODO NEWTEZ Handle encrypted shuffle
+  @VisibleForTesting
+  URI getBaseURI(String host, int port, int partitionId) {
+    StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
+      partitionId, inputContext.getApplicationId().toString(), sslShuffle);
+    URI u = URI.create(sb.toString());
+    return u;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
new file mode 100644
index 0000000..6fec954
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -0,0 +1,623 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
+
+import com.google.common.collect.Lists;
+
+class ShuffleScheduler {
+  static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
+    protected Long initialValue() {
+      return 0L;
+    }
+  };
+
+  private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
+  private static final long INITIAL_PENALTY = 2000l; // 2 seconds
+  private static final float PENALTY_GROWTH_RATE = 1.3f;
+  
+  // TODO NEWTEZ May need to be a string if attempting to fetch from multiple inputs.
+  private boolean[] finishedMaps;
+  private final int numInputs;
+  private int remainingMaps;
+  private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
+  //TODO NEWTEZ Clean this and other maps at some point
+  private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>(); 
+  private Set<MapHost> pendingHosts = new HashSet<MapHost>();
+  private Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
+  
+  private final Random random = new Random(System.currentTimeMillis());
+  private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
+  private final Referee referee;
+  private final Map<InputAttemptIdentifier, IntWritable> failureCounts =
+    new HashMap<InputAttemptIdentifier,IntWritable>(); 
+  private final Map<String,IntWritable> hostFailures = 
+    new HashMap<String,IntWritable>();
+  private final InputContext inputContext;
+  private final Shuffle shuffle;
+  private final TezCounter shuffledInputsCounter;
+  private final TezCounter skippedInputCounter;
+  private final TezCounter reduceShuffleBytes;
+  private final TezCounter reduceBytesDecompressed;
+  private final TezCounter failedShuffleCounter;
+  private final TezCounter bytesShuffledToDisk;
+  private final TezCounter bytesShuffledToDiskDirect;
+  private final TezCounter bytesShuffledToMem;
+
+  private final long startTime;
+  private long lastProgressTime;
+
+  private int maxTaskOutputAtOnce;
+  private int maxFetchFailuresBeforeReporting;
+  private boolean reportReadErrorImmediately = true; 
+  private int maxFailedUniqueFetches = 5;
+  private final int abortFailureLimit;
+  private int maxMapRuntime = 0;
+
+  private long totalBytesShuffledTillNow = 0;
+  private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
+  
+  public ShuffleScheduler(InputContext inputContext,
+                          Configuration conf,
+                          int numberOfInputs,
+                          Shuffle shuffle,
+                          TezCounter shuffledInputsCounter,
+                          TezCounter reduceShuffleBytes,
+                          TezCounter reduceBytesDecompressed,
+                          TezCounter failedShuffleCounter,
+                          TezCounter bytesShuffledToDisk,
+                          TezCounter bytesShuffledToDiskDirect,
+                          TezCounter bytesShuffledToMem) {
+    this.inputContext = inputContext;
+    this.numInputs = numberOfInputs;
+    abortFailureLimit = Math.max(30, numberOfInputs / 10);
+    remainingMaps = numberOfInputs;
+    finishedMaps = new boolean[remainingMaps]; // default init to false
+    this.referee = new Referee();
+    this.shuffle = shuffle;
+    this.shuffledInputsCounter = shuffledInputsCounter;
+    this.reduceShuffleBytes = reduceShuffleBytes;
+    this.reduceBytesDecompressed = reduceBytesDecompressed;
+    this.failedShuffleCounter = failedShuffleCounter;
+    this.bytesShuffledToDisk = bytesShuffledToDisk;
+    this.bytesShuffledToDiskDirect = bytesShuffledToDiskDirect;
+    this.bytesShuffledToMem = bytesShuffledToMem;
+    this.startTime = System.currentTimeMillis();
+    this.lastProgressTime = startTime;
+
+    this.maxFailedUniqueFetches = Math.min(numberOfInputs,
+        this.maxFailedUniqueFetches);
+    referee.start();
+    this.maxFetchFailuresBeforeReporting = 
+        conf.getInt(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT);
+    this.reportReadErrorImmediately = 
+        conf.getBoolean(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, 
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT);
+    this.maxTaskOutputAtOnce = Math.max(1, conf.getInt(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT));
+    
+    this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
+
+    LOG.info("ShuffleScheduler running for sourceVertex: "
+        + inputContext.getSourceVertexName() + " with configuration: "
+        + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
+        + ", reportReadErrorImmediately=" + reportReadErrorImmediately
+        + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
+        + ", abortFailureLimit=" + abortFailureLimit
+        + ", maxMapRuntime=" + maxMapRuntime);
+  }
+
+  public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier, 
+                                         MapHost host,
+                                         long bytesCompressed,
+                                         long bytesDecompressed,
+                                         long millis,
+                                         MapOutput output
+                                         ) throws IOException {
+
+    if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
+      if (output != null) {
+
+        failureCounts.remove(srcAttemptIdentifier);
+        if (host != null) {
+          hostFailures.remove(host.getHostIdentifier());
+        }
+
+        output.commit();
+        logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed, output,
+            srcAttemptIdentifier);
+        if (output.getType() == Type.DISK) {
+          bytesShuffledToDisk.increment(bytesCompressed);
+        } else if (output.getType() == Type.DISK_DIRECT) {
+          bytesShuffledToDiskDirect.increment(bytesCompressed);
+        } else {
+          bytesShuffledToMem.increment(bytesCompressed);
+        }
+        shuffledInputsCounter.increment(1);
+      } else {
+        // Output null implies that a physical input completion is being
+        // registered without needing to fetch data
+        skippedInputCounter.increment(1);
+      }
+      setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
+      
+      if (--remainingMaps == 0) {
+        LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
+        notifyAll();
+      }
+
+      // update the status
+      lastProgressTime = System.currentTimeMillis();
+      totalBytesShuffledTillNow += bytesCompressed;
+      logProgress();
+      reduceShuffleBytes.increment(bytesCompressed);
+      reduceBytesDecompressed.increment(bytesDecompressed);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("src task: "
+            + TezRuntimeUtils.getTaskAttemptIdentifier(
+                inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+                srcAttemptIdentifier.getAttemptNumber()) + " done");
+      }
+    } else {
+      // input is already finished. duplicate fetch.
+      LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + srcAttemptIdentifier);
+      // free the resource - specially memory
+      
+      // If the src does not generate data, output will be null.
+      if (output != null) {
+        output.abort();
+      }
+    }
+    // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
+  }
+
+  private void logIndividualFetchComplete(long millis, long bytesCompressed, long bytesDecompressed,
+                                          MapOutput output,
+                                          InputAttemptIdentifier srcAttemptIdentifier) {
+    double rate = 0;
+    if (millis != 0) {
+      rate = bytesCompressed / ((double) millis / 1000);
+      rate = rate / (1024 * 1024);
+    }
+    LOG.info(
+        "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + output.getType() +
+            ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
+            ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
+            mbpsFormat.format(rate) + " MB/s");
+  }
+
+  private void logProgress() {
+    double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
+    int inputsDone = numInputs - remainingMaps;
+    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+
+    double transferRate = mbs / secsSinceStart;
+    LOG.info("copy(" + inputsDone + " of " + numInputs +
+        ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
+        + mbpsFormat.format(transferRate) + " MB/s)");
+  }
+
+  public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
+                                      MapHost host,
+                                      boolean readError,
+                                      boolean connectError) {
+    host.penalize();
+    int failures = 1;
+    if (failureCounts.containsKey(srcAttempt)) {
+      IntWritable x = failureCounts.get(srcAttempt);
+      x.set(x.get() + 1);
+      failures = x.get();
+    } else {
+      failureCounts.put(srcAttempt, new IntWritable(1));      
+    }
+    String hostPort = host.getHostIdentifier();
+    // TODO TEZ-922 hostFailures isn't really used for anything. Factor it into error
+    // reporting / potential blacklisting of hosts.
+    if (hostFailures.containsKey(hostPort)) {
+      IntWritable x = hostFailures.get(hostPort);
+      x.set(x.get() + 1);
+    } else {
+      hostFailures.put(hostPort, new IntWritable(1));
+    }
+    if (failures >= abortFailureLimit) {
+      // This task has seen too many fetch failures - report it as failed. The
+      // AM may retry it if max failures has not been reached.
+      
+      // Between the task and the AM - someone needs to determine who is at
+      // fault. If there's enough errors seen on the task, before the AM informs
+      // it about source failure, the task considers itself to have failed and
+      // allows the AM to re-schedule it.
+      IOException ioe = new IOException(failures
+            + " failures downloading "
+            + TezRuntimeUtils.getTaskAttemptIdentifier(
+                inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
+                srcAttempt.getAttemptNumber()));
+      ioe.fillInStackTrace();
+      shuffle.reportException(ioe);
+    }
+
+    failedShuffleCounter.increment(1);
+    checkAndInformAM(failures, srcAttempt, readError, connectError);
+
+    checkReducerHealth();
+    
+    long delay = (long) (INITIAL_PENALTY *
+        Math.pow(PENALTY_GROWTH_RATE, failures));
+    
+    penalties.add(new Penalty(host, delay));    
+  }
+
+  public void reportLocalError(IOException ioe) {
+    LOG.error("Shuffle failed : caused by local error", ioe);
+    shuffle.reportException(ioe);
+  }
+
+  // Notify the AM  
+  // after every read error, if 'reportReadErrorImmediately' is true or
+  // after every 'maxFetchFailuresBeforeReporting' failures
+  private void checkAndInformAM(
+      int failures, InputAttemptIdentifier srcAttempt, boolean readError,
+      boolean connectError) {
+    if ((reportReadErrorImmediately && (readError || connectError))
+        || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+      LOG.info("Reporting fetch failure for InputIdentifier: " 
+          + srcAttempt + " taskAttemptIdentifier: "
+          + TezRuntimeUtils.getTaskAttemptIdentifier(
+              inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
+              srcAttempt.getAttemptNumber()) + " to AM.");
+      List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+      failedEvents.add(InputReadErrorEvent.create("Fetch failure for "
+          + TezRuntimeUtils.getTaskAttemptIdentifier(
+          inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
+          srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier()
+          .getInputIndex(), srcAttempt.getAttemptNumber()));
+
+      inputContext.sendEvents(failedEvents);      
+    }
+  }
+
+  private void checkReducerHealth() {
+    final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
+    final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
+    final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
+
+    long totalFailures = failedShuffleCounter.getValue();
+    int doneMaps = numInputs - remainingMaps;
+    
+    boolean reducerHealthy =
+      (((float)totalFailures / (totalFailures + doneMaps))
+          < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
+    
+    // check if the reducer has progressed enough
+    boolean reducerProgressedEnough =
+      (((float)doneMaps / numInputs)
+          >= MIN_REQUIRED_PROGRESS_PERCENT);
+
+    // check if the reducer is stalled for a long time
+    // duration for which the reducer is stalled
+    int stallDuration =
+      (int)(System.currentTimeMillis() - lastProgressTime);
+    
+    // duration for which the reducer ran with progress
+    int shuffleProgressDuration =
+      (int)(lastProgressTime - startTime);
+
+    // min time the reducer should run without getting killed
+    int minShuffleRunDuration =
+      (shuffleProgressDuration > maxMapRuntime)
+      ? shuffleProgressDuration
+          : maxMapRuntime;
+    
+    boolean reducerStalled =
+      (((float)stallDuration / minShuffleRunDuration)
+          >= MAX_ALLOWED_STALL_TIME_PERCENT);
+
+    // kill if not healthy and has insufficient progress
+    if ((failureCounts.size() >= maxFailedUniqueFetches ||
+        failureCounts.size() == (numInputs - doneMaps))
+        && !reducerHealthy
+        && (!reducerProgressedEnough || reducerStalled)) {
+      LOG.fatal("Shuffle failed with too many fetch failures " + "and insufficient progress!"
+          + "failureCounts=" + failureCounts.size() + ", pendingInputs=" + (numInputs - doneMaps)
+          + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough="
+          + reducerProgressedEnough + ", reducerStalled=" + reducerStalled);
+      String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
+      shuffle.reportException(new IOException(errorMsg));
+    }
+
+  }
+  
+  public synchronized void addKnownMapOutput(String inputHostName,
+                                             int port,
+                                             int partitionId,
+                                             String hostUrl,
+                                             InputAttemptIdentifier srcAttempt) {
+    String hostPort = (inputHostName + ":" + String.valueOf(port));
+    String identifier = MapHost.createIdentifier(hostPort, partitionId);
+    MapHost host = mapLocations.get(identifier);
+    if (host == null) {
+      host = new MapHost(partitionId, hostPort, hostUrl);
+      assert identifier.equals(host.getIdentifier());
+      mapLocations.put(identifier, host);
+    }
+    host.addKnownMap(srcAttempt);
+    pathToIdentifierMap.put(
+        getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), partitionId), srcAttempt);
+
+    // Mark the host as pending
+    if (host.getState() == MapHost.State.PENDING) {
+      pendingHosts.add(host);
+      notifyAll();
+    }
+  }
+  
+  public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
+    // The incoming srcAttempt does not contain a path component.
+    LOG.info("Adding obsolete input: " + srcAttempt);
+    obsoleteInputs.add(srcAttempt);
+  }
+  
+  public synchronized void putBackKnownMapOutput(MapHost host,
+                                                 InputAttemptIdentifier srcAttempt) {
+    host.addKnownMap(srcAttempt);
+  }
+
+  public synchronized MapHost getHost() throws InterruptedException {
+      while(pendingHosts.isEmpty()) {
+        wait();
+      }
+      
+      MapHost host = null;
+      Iterator<MapHost> iter = pendingHosts.iterator();
+      int numToPick = random.nextInt(pendingHosts.size());
+      for (int i=0; i <= numToPick; ++i) {
+        host = iter.next();
+      }
+      
+      pendingHosts.remove(host);     
+      host.markBusy();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Assigning " + host + " with " + host.getNumKnownMapOutputs() +
+            " to " + Thread.currentThread().getName());
+      }
+      shuffleStart.set(System.currentTimeMillis());
+      
+      return host;
+  }
+  
+  public InputAttemptIdentifier getIdentifierForFetchedOutput(
+      String path, int reduceId) {
+    return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId));
+  }
+  
+  private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
+    return (!obsoleteInputs.contains(id) && 
+             !isInputFinished(id.getInputIdentifier().getInputIndex()));
+  }
+  
+  public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
+    List<InputAttemptIdentifier> origList = host.getAndClearKnownMaps();
+
+    Map<Integer, InputAttemptIdentifier> dedupedList = new LinkedHashMap<Integer, InputAttemptIdentifier>();
+    Iterator<InputAttemptIdentifier> listItr = origList.iterator();
+    while (listItr.hasNext()) {
+      // we may want to try all versions of the input but with current retry
+      // behavior older ones are likely to be lost and should be ignored.
+      // This may be removed after TEZ-914
+      InputAttemptIdentifier id = listItr.next();
+      if (inputShouldBeConsumed(id)) {
+        Integer inputNumber = new Integer(id.getInputIdentifier().getInputIndex());
+        InputAttemptIdentifier oldId = dedupedList.get(inputNumber);
+        if (oldId == null || oldId.getAttemptNumber() < id.getAttemptNumber()) {
+          dedupedList.put(inputNumber, id);
+          if (oldId != null) {
+            LOG.warn("Old Src for InputIndex: " + inputNumber + " with attemptNumber: "
+                + oldId.getAttemptNumber()
+                + " was not determined to be invalid. Ignoring it for now in favour of "
+                + id.getAttemptNumber());
+          }
+        }
+      } else {
+        LOG.info("Ignoring finished or obsolete source: " + id);
+      }
+    }
+    
+    // Compute the final list, limited by NUM_FETCHERS_AT_ONCE
+    List<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
+    int includedMaps = 0;
+    int totalSize = dedupedList.size();
+    Iterator<Map.Entry<Integer, InputAttemptIdentifier>> dedupedItr = dedupedList.entrySet().iterator();
+    // find the maps that we still need, up to the limit
+    while (dedupedItr.hasNext()) {
+      InputAttemptIdentifier id = dedupedItr.next().getValue();
+      result.add(id);
+      if (++includedMaps >= maxTaskOutputAtOnce) {
+        break;
+      }
+    }
+
+    // put back the maps left after the limit
+    while (dedupedItr.hasNext()) {
+      InputAttemptIdentifier id = dedupedItr.next().getValue();
+      host.addKnownMap(id);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("assigned " + includedMaps + " of " + totalSize + " to " +
+          host + " to " + Thread.currentThread().getName());
+    }
+    return result;
+  }
+
+  public synchronized void freeHost(MapHost host) {
+    if (host.getState() != MapHost.State.PENALIZED) {
+      if (host.markAvailable() == MapHost.State.PENDING) {
+        pendingHosts.add(host);
+        notifyAll();
+      }
+    }
+    LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + 
+             (System.currentTimeMillis()-shuffleStart.get()) + "ms");
+  }
+
+  public synchronized void resetKnownMaps() {
+    mapLocations.clear();
+    obsoleteInputs.clear();
+    pendingHosts.clear();
+    pathToIdentifierMap.clear();
+  }
+
+  /**
+   * Utility method to check if the Shuffle data fetch is complete.
+   * @return
+   */
+  public synchronized boolean isDone() {
+    return remainingMaps == 0;
+  }
+
+  /**
+   * Wait until the shuffle finishes or until the timeout.
+   * @param millis maximum wait time
+   * @return true if the shuffle is done
+   * @throws InterruptedException
+   */
+  public synchronized boolean waitUntilDone(int millis
+                                            ) throws InterruptedException {
+    if (remainingMaps > 0) {
+      wait(millis);
+      return remainingMaps == 0;
+    }
+    return true;
+  }
+  
+  /**
+   * A structure that records the penalty for a host.
+   */
+  private static class Penalty implements Delayed {
+    MapHost host;
+    private long endTime;
+    
+    Penalty(MapHost host, long delay) {
+      this.host = host;
+      this.endTime = System.currentTimeMillis() + delay;
+    }
+
+    public long getDelay(TimeUnit unit) {
+      long remainingTime = endTime - System.currentTimeMillis();
+      return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
+    }
+
+    public int compareTo(Delayed o) {
+      long other = ((Penalty) o).endTime;
+      return endTime == other ? 0 : (endTime < other ? -1 : 1);
+    }
+    
+  }
+  
+  private String getIdentifierFromPathAndReduceId(String path, int reduceId) {
+    return path + "_" + reduceId;
+  }
+  
+  /**
+   * A thread that takes hosts off of the penalty list when the timer expires.
+   */
+  private class Referee extends Thread {
+    public Referee() {
+      setName("ShufflePenaltyReferee ["
+          + TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + "]");
+      setDaemon(true);
+    }
+
+    public void run() {
+      try {
+        while (true) {
+          // take the first host that has an expired penalty
+          MapHost host = penalties.take().host;
+          synchronized (ShuffleScheduler.this) {
+            if (host.markAvailable() == MapHost.State.PENDING) {
+              pendingHosts.add(host);
+              ShuffleScheduler.this.notifyAll();
+            }
+          }
+        }
+      } catch (InterruptedException ie) {
+        return;
+      } catch (Throwable t) {
+        shuffle.reportException(t);
+      }
+    }
+  }
+  
+  public void close() throws InterruptedException {
+    /// ZZZ need to interrupt setlf ?
+    referee.interrupt();
+    referee.join();
+  }
+
+  public synchronized void informMaxMapRunTime(int duration) {
+    if (duration > maxMapRuntime) {
+      maxMapRuntime = duration;
+    }
+  }
+  
+  void setInputFinished(int inputIndex) {
+    synchronized(finishedMaps) {
+      finishedMaps[inputIndex] = true;
+    }
+  }
+  
+  boolean isInputFinished(int inputIndex) {
+    synchronized (finishedMaps) {
+      return finishedMaps[inputIndex];      
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/package-info.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/package-info.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/package-info.java
new file mode 100644
index 0000000..1d32101
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Private
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/package-info.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/package-info.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/package-info.java
new file mode 100644
index 0000000..974932e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Private
+package org.apache.tez.runtime.library.common.shuffle;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index bb8c912..f100f23 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -51,7 +51,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.combine.Combiner;
-import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 50664f5..1ba00a0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -61,7 +61,7 @@ import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 
 import com.google.common.annotations.VisibleForTesting;

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index fe85a99..4abc6ce 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -47,7 +47,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.ValuesIterator;
-import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 
 import com.google.common.base.Preconditions;

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 75fa64a..1444e08 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -45,10 +45,10 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.readers.UnorderedKVReader;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
-import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleInputEventHandlerImpl;
-import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
-import org.apache.tez.runtime.library.shuffle.common.impl.SimpleFetchedInputAllocator;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
+import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandlerImpl;
+import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
+import org.apache.tez.runtime.library.common.shuffle.impl.SimpleFetchedInputAllocator;
 
 import com.google.common.base.Preconditions;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 40e99ad..9cd7fcd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -53,7 +53,7 @@ import org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 76dab45..9c701aa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -46,7 +46,7 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.library.api.KeyValuesWriter;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.broadcast.output.FileBasedKVWriter;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
deleted file mode 100644
index b0b911b..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.shuffle.common;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
-
-import com.google.common.base.Preconditions;
-
-public class DiskFetchedInput extends FetchedInput {
-
-  private static final Log LOG = LogFactory.getLog(DiskFetchedInput.class);
-  
-  private final FileSystem localFS;
-  private final Path tmpOutputPath;
-  private final Path outputPath;
-
-  public DiskFetchedInput(long actualSize, long compressedSize,
-      InputAttemptIdentifier inputAttemptIdentifier,
-      FetchedInputCallback callbackHandler, Configuration conf,
-      LocalDirAllocator localDirAllocator, TezTaskOutputFiles filenameAllocator)
-      throws IOException {
-    super(Type.DISK, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler);
-
-    this.localFS = FileSystem.getLocal(conf);
-    this.outputPath = filenameAllocator.getInputFileForWrite(
-        this.inputAttemptIdentifier.getInputIdentifier().getInputIndex(), actualSize);
-    // Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
-    // otherwise fetches for the same task but from different attempts would clobber each other.
-    this.tmpOutputPath = outputPath.suffix(String.valueOf(id));
-  }
-
-  @Override
-  public OutputStream getOutputStream() throws IOException {
-    return localFS.create(tmpOutputPath);
-  }
-
-  @Override
-  public InputStream getInputStream() throws IOException {
-    return localFS.open(outputPath);
-  }
-
-  public final Path getInputPath() {
-    if (state == State.COMMITTED) {
-      return this.outputPath;
-    }
-    return this.tmpOutputPath;
-  }
-  
-  @Override
-  public void commit() throws IOException {
-    if (state == State.PENDING) {
-      state = State.COMMITTED;
-      localFS.rename(tmpOutputPath, outputPath);
-      notifyFetchComplete();
-    }
-  }
-
-  @Override
-  public void abort() throws IOException {
-    if (state == State.PENDING) {
-      state = State.ABORTED;
-      // TODO NEWTEZ Maybe defer this to container cleanup
-      localFS.delete(tmpOutputPath, false);
-      notifyFetchFailure();
-    }
-  }
-  
-  @Override
-  public void free() {
-    Preconditions.checkState(
-        state == State.COMMITTED || state == State.ABORTED,
-        "FetchedInput can only be freed after it is committed or aborted");
-    if (state == State.COMMITTED) {
-      state = State.FREED;
-      try {
-        // TODO NEWTEZ Maybe defer this to container cleanup
-        localFS.delete(outputPath, false);
-      } catch (IOException e) {
-        // Ignoring the exception, will eventually be cleaned by container
-        // cleanup.
-        LOG.warn("Failed to remvoe file : " + outputPath.toString());
-      }
-      notifyFreedResource();
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "DiskFetchedInput [outputPath=" + outputPath
-        + ", inputAttemptIdentifier=" + inputAttemptIdentifier
-        + ", actualSize=" + actualSize + ",compressedSize=" + compressedSize
-        + ", type=" + type + ", id=" + id + ", state=" + state + "]";
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchResult.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchResult.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchResult.java
deleted file mode 100644
index df38b07..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchResult.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.shuffle.common;
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-
-public class FetchResult {
-
-  private final String host;
-  private final int port;
-  private final int partition;
-  private final Iterable<InputAttemptIdentifier> pendingInputs;
-
-  public FetchResult(String host, int port, int partition,
-      Iterable<InputAttemptIdentifier> pendingInputs) {
-    this.host = host;
-    this.port = port;
-    this.partition = partition;
-    this.pendingInputs = pendingInputs;
-  }
-
-  public String getHost() {
-    return host;
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  public int getPartition() {
-    return partition;
-  }
-
-  public Iterable<InputAttemptIdentifier> getPendingInputs() {
-    return pendingInputs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
deleted file mode 100644
index 0a83dc9..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.shuffle.common;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-
-@Private
-public abstract class FetchedInput {
-  
-  public static enum Type {
-    WAIT, // TODO NEWTEZ Implement this, only if required.
-    MEMORY,
-    DISK,
-    DISK_DIRECT
-  }
-  
-  protected static enum State {
-    PENDING, COMMITTED, ABORTED, FREED
-  }
-
-  private static AtomicInteger ID_GEN = new AtomicInteger(0);
-
-  protected InputAttemptIdentifier inputAttemptIdentifier;
-  protected final long actualSize;
-  protected final long compressedSize;
-  protected final Type type;
-  protected final FetchedInputCallback callback;
-  protected final int id;
-  protected State state;
-
-  public FetchedInput(Type type, long actualSize, long compressedSize,
-      InputAttemptIdentifier inputAttemptIdentifier,
-      FetchedInputCallback callbackHandler) {
-    this.type = type;
-    this.actualSize = actualSize;
-    this.compressedSize = compressedSize;
-    this.inputAttemptIdentifier = inputAttemptIdentifier;
-    this.callback = callbackHandler;
-    this.id = ID_GEN.getAndIncrement();
-    this.state = State.PENDING;
-  }
-
-  public Type getType() {
-    return this.type;
-  }
-
-  public long getActualSize() {
-    return this.actualSize;
-  }
-  
-  public long getCompressedSize() {
-    return this.compressedSize;
-  }
-
-  public InputAttemptIdentifier getInputAttemptIdentifier() {
-    return this.inputAttemptIdentifier;
-  }
-
-  /**
-   * Inform the Allocator about a committed resource.
-   * This should be called by commit
-   */
-  public void notifyFetchComplete() {
-    this.callback.fetchComplete(this);
-  }
-  
-  /**
-   * Inform the Allocator about a failed resource.
-   * This should be called by abort
-   */
-  public void notifyFetchFailure() {
-    this.callback.fetchFailed(this);
-  }
-  
-  /**
-   * Inform the Allocator about a completed resource being released.
-   * This should be called by free
-   */
-  public void notifyFreedResource() {
-    this.callback.freeResources(this);
-  }
-  
-  /**
-   * Returns the output stream to be used to write fetched data. Users are
-   * expected to close the OutputStream when they're done
-   */
-  public abstract OutputStream getOutputStream() throws IOException;
-
-  /**
-   * Return an input stream to be used to read the previously fetched data.
-   * All calls to getInputStream() produce new reset streams for reading.
-   * Users are expected to close the InputStream when they're done.
-   */
-  public abstract InputStream getInputStream() throws IOException;
-
-  /**
-   * Commit the output. Should be idempotent
-   */
-  public abstract void commit() throws IOException;
-
-  /**
-   * Abort the output. Should be idempotent
-   */
-  public abstract void abort() throws IOException;
-
-  /**
-   * Called when this input has been consumed, so that resources can be
-   * reclaimed.
-   */
-  public abstract void free();
-  
-  @Override
-  public int hashCode() {
-    return id;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    FetchedInput other = (FetchedInput) obj;
-    if (id != other.id)
-      return false;
-    return true;
-  }
-}


Mime
View raw message