tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/9] TEZ-1479. Disambiguate (refactor) between ShuffleInputEventHandlers and Fetchers. (sseth) (cherry picked from commit 7be5830a908602ff91a07d3020f2dddf7705d48f)
Date Wed, 15 Oct 2014 18:59:38 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
deleted file mode 100644
index ddf98b6..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ /dev/null
@@ -1,804 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.shuffle.common.impl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.text.DecimalFormat;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.tez.common.TezRuntimeFrameworkConfigs;
-import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.InputIdentifier;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
-import org.apache.tez.runtime.library.shuffle.common.FetchResult;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
-import org.apache.tez.runtime.library.shuffle.common.Fetcher;
-import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
-import org.apache.tez.runtime.library.shuffle.common.HttpConnection;
-import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
-import org.apache.tez.runtime.library.shuffle.common.InputHost;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
-import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-// This only knows how to deal with a single srcIndex for a given targetIndex.
-// In case the src task generates multiple outputs for the same target Index
-// (multiple src-indices), modifications will be required.
-public class ShuffleManager implements FetcherCallback {
-
-  private static final Log LOG = LogFactory.getLog(ShuffleManager.class);
-  
-  private final InputContext inputContext;
-  private final int numInputs;
-
-  private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
-
-  private final FetchedInputAllocator inputManager;
-
-  private final ListeningExecutorService fetcherExecutor;
-
-  private final ListeningExecutorService schedulerExecutor;
-  private final RunShuffleCallable schedulerCallable;
-  
-  private final BlockingQueue<FetchedInput> completedInputs;
-  private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
-  private final Set<InputIdentifier> completedInputSet;
-  private final ConcurrentMap<String, InputHost> knownSrcHosts;
-  private final BlockingQueue<InputHost> pendingHosts;
-  private final Set<InputAttemptIdentifier> obsoletedInputs;
-  private Set<Fetcher> runningFetchers;
-  
-  private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
-  
-  private final long startTime;
-  private long lastProgressTime;
-  private long totalBytesShuffledTillNow;
-
-  // Required to be held when manipulating pendingHosts
-  private final ReentrantLock lock = new ReentrantLock();
-  private final Condition wakeLoop = lock.newCondition();
-  
-  private final int numFetchers;
-  
-  // Parameters required by Fetchers
-  private final SecretKey shuffleSecret;
-  private final CompressionCodec codec;
-  private final boolean localDiskFetchEnabled;
-  private final boolean sharedFetchEnabled;
-  
-  private final int ifileBufferSize;
-  private final boolean ifileReadAhead;
-  private final int ifileReadAheadLength;
-  
-  private final String srcNameTrimmed; 
-  
-  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
-
-  private final TezCounter shuffledInputsCounter;
-  private final TezCounter failedShufflesCounter;
-  private final TezCounter bytesShuffledCounter;
-  private final TezCounter decompressedDataSizeCounter;
-  private final TezCounter bytesShuffledToDiskCounter;
-  private final TezCounter bytesShuffledToMemCounter;
-  private final TezCounter bytesShuffledDirectDiskCounter;
-  
-  private volatile Throwable shuffleError;
-  private final HttpConnectionParams httpConnectionParams;
-  
-
-  private final LocalDirAllocator localDirAllocator;
-  private final RawLocalFileSystem localFs;
-  private final Path[] localDisks;
-  private final static String localhostName = NetUtils.getHostname();
-
-  // TODO More counters - FetchErrors, speed?
-  
-  public ShuffleManager(InputContext inputContext, Configuration conf, int numInputs,
-      int bufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength,
-      CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException {
-    this.inputContext = inputContext;
-    this.numInputs = numInputs;
-    
-    this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
-    this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
-    this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
-    this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
-    this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
-    this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
-    this.bytesShuffledDirectDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
-  
-    this.ifileBufferSize = bufferSize;
-    this.ifileReadAhead = ifileReadAheadEnabled;
-    this.ifileReadAheadLength = ifileReadAheadLength;
-    this.codec = codec;
-    this.inputManager = inputAllocator;
-    this.localDiskFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
-        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
-    this.sharedFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH,
-        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT);
-    
-    this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
-  
-    completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
-    completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
-    knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
-    pendingHosts = new LinkedBlockingQueue<InputHost>();
-    obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
-    runningFetchers = Collections.newSetFromMap(new ConcurrentHashMap<Fetcher, Boolean>());
-
-    int maxConfiguredFetchers = 
-        conf.getInt(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
-    
-    this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
-    
-    ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
-        numFetchers,
-        new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d " + localhostName).build());
-    this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
-    
-    ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
-        .setDaemon(true).setNameFormat("ShuffleRunner [" + srcNameTrimmed + "]").build());
-    this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
-    this.schedulerCallable = new RunShuffleCallable(conf);
-    
-    this.startTime = System.currentTimeMillis();
-    this.lastProgressTime = startTime;
-    
-    this.shuffleSecret = ShuffleUtils
-        .getJobTokenSecretFromTokenBytes(inputContext
-            .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
-    httpConnectionParams =
-        ShuffleUtils.constructHttpShuffleConnectionParams(conf);
-
-    this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
-
-    this.localDirAllocator = new LocalDirAllocator(
-        TezRuntimeFrameworkConfigs.LOCAL_DIRS);
-
-    this.localDisks = Iterables.toArray(
-        localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
-
-    Arrays.sort(this.localDisks);
-
-    LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
-        + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
-        + numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
-        + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", "
-        + "localDiskFetchEnabled=" + localDiskFetchEnabled + ", "
-        + "sharedFetchEnabled=" + sharedFetchEnabled + ", "
-        + httpConnectionParams.toString());
-  }
-
-  public void run() throws IOException {
-    Preconditions.checkState(inputManager != null, "InputManager must be configured");
-
-    ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
-    Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
-    // Shutdown this executor once this task, and the callback complete.
-    schedulerExecutor.shutdown();
-  }
-  
-  private class RunShuffleCallable implements Callable<Void> {
-
-    private final Configuration conf;
-
-    public RunShuffleCallable(Configuration conf) {
-      this.conf = conf;
-    }
-
-    @Override
-    public Void call() throws Exception {
-      while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
-        lock.lock();
-        try {
-          if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) {
-            if (numCompletedInputs.get() < numInputs) {
-              wakeLoop.await();
-            }
-          }
-        } finally {
-          lock.unlock();
-        }
-
-        if (shuffleError != null) {
-          // InputContext has already been informed of a fatal error. Relying on
-          // tez to kill the task.
-          break;
-        }
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("NumCompletedInputs: " + numCompletedInputs);
-        }
-        if (numCompletedInputs.get() < numInputs && !isShutdown.get()) {
-          lock.lock();
-          try {
-            int maxFetchersToRun = numFetchers - runningFetchers.size();
-            int count = 0;
-            while (pendingHosts.peek() != null && !isShutdown.get()) {
-              InputHost inputHost = null;
-              try {
-                inputHost = pendingHosts.take();
-              } catch (InterruptedException e) {
-                if (isShutdown.get()) {
-                  LOG.info("Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
-                  break;
-                } else {
-                  throw e;
-                }
-              }
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Processing pending host: " + inputHost.toDetailedString());
-              }
-              if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) {
-                LOG.info("Scheduling fetch for inputHost: " + inputHost.getIdentifier());
-                Fetcher fetcher = constructFetcherForHost(inputHost, conf);
-                runningFetchers.add(fetcher);
-                if (isShutdown.get()) {
-                  LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
-                }
-                ListenableFuture<FetchResult> future = fetcherExecutor
-                    .submit(fetcher);
-                Futures.addCallback(future, new FetchFutureCallback(fetcher));
-                if (++count >= maxFetchersToRun) {
-                  break;
-                }
-              } else {
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Skipping host: " + inputHost.getIdentifier()
-                      + " since it has no inputs to process");
-                }
-              }
-            }
-          } finally {
-            lock.unlock();
-          }
-        }
-      }
-      LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
-      // TODO NEWTEZ Maybe clean up inputs.
-      if (!fetcherExecutor.isShutdown()) {
-        fetcherExecutor.shutdownNow();
-      }
-      return null;
-    }
-  }
-  
-  private Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
-
-    Path lockDisk = null;
-
-    if (sharedFetchEnabled) {
-      // pick a single lock disk from the edge name's hashcode + host hashcode
-      final int h = Math.abs(Objects.hashCode(this.srcNameTrimmed, inputHost.getHost()));
-      lockDisk = new Path(this.localDisks[h % this.localDisks.length], "locks");
-    }
-
-    FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
-      httpConnectionParams, inputManager, inputContext.getApplicationId(),
-        shuffleSecret, srcNameTrimmed, conf, localFs, localDirAllocator,
-        lockDisk, localDiskFetchEnabled, sharedFetchEnabled);
-
-    if (codec != null) {
-      fetcherBuilder.setCompressionParameters(codec);
-    }
-    fetcherBuilder.setIFileParams(ifileReadAhead, ifileReadAheadLength);
-
-    // Remove obsolete inputs from the list being given to the fetcher. Also
-    // remove from the obsolete list.
-    List<InputAttemptIdentifier> pendingInputsForHost = inputHost
-        .clearAndGetPendingInputs();
-    for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
-        .iterator(); inputIter.hasNext();) {
-      InputAttemptIdentifier input = inputIter.next();
-      // Avoid adding attempts which have already completed.
-      if (completedInputSet.contains(input.getInputIdentifier())) {
-        inputIter.remove();
-        continue;
-      }
-      // Avoid adding attempts which have been marked as OBSOLETE 
-      if (obsoletedInputs.contains(input)) {
-        inputIter.remove();
-      }
-    }
-    // TODO NEWTEZ Maybe limit the number of inputs being given to a single
-    // fetcher, especially in the case where #hosts < #fetchers
-    fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
-        inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
-    LOG.info("Created Fetcher for host: " + inputHost.getHost()
-        + ", with inputs: " + pendingInputsForHost);
-    return fetcherBuilder.build();
-  }
-  
-  /////////////////// Methods for InputEventHandler
-  
-  public void addKnownInput(String hostName, int port,
-      InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) {
-    String identifier = InputHost.createIdentifier(hostName, port);
-    InputHost host = knownSrcHosts.get(identifier);
-    if (host == null) {
-      host = new InputHost(hostName, port, inputContext.getApplicationId(), srcPhysicalIndex);
-      assert identifier.equals(host.getIdentifier());
-      InputHost old = knownSrcHosts.putIfAbsent(identifier, host);
-      if (old != null) {
-        host = old;
-      }
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
-    }
-    host.addKnownInput(srcAttemptIdentifier);
-    lock.lock();
-    try {
-      boolean added = pendingHosts.offer(host);
-      if (!added) {
-        String errorMessage = "Unable to add host: " + host.getIdentifier() + " to pending queue";
-        LOG.error(errorMessage);
-        throw new TezUncheckedException(errorMessage);
-      }
-      wakeLoop.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  public void addCompletedInputWithNoData(
-      InputAttemptIdentifier srcAttemptIdentifier) {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-    LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
-    
-    if (!completedInputSet.contains(inputIdentifier)) {
-      synchronized (completedInputSet) {
-        if (!completedInputSet.contains(inputIdentifier)) {
-          registerCompletedInput(new NullFetchedInput(srcAttemptIdentifier));
-        }
-      }
-    }
-
-    // Awake the loop to check for termination.
-    lock.lock();
-    try {
-      wakeLoop.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  public void addCompletedInputWithData(
-      InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput)
-      throws IOException {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-
-    LOG.info("Received Data via Event: " + srcAttemptIdentifier + " to "
-        + fetchedInput.getType());
-    // Count irrespective of whether this is a copy of an already fetched input
-    lock.lock();
-    try {
-      lastProgressTime = System.currentTimeMillis();
-    } finally {
-      lock.unlock();
-    }
-
-    boolean committed = false;
-    if (!completedInputSet.contains(inputIdentifier)) {
-      synchronized (completedInputSet) {
-        if (!completedInputSet.contains(inputIdentifier)) {
-          fetchedInput.commit();
-          committed = true;
-          registerCompletedInput(fetchedInput);
-        }
-      }
-    }
-    if (!committed) {
-      fetchedInput.abort(); // If this fails, the fetcher may attempt another
-                            // abort.
-    } else {
-      lock.lock();
-      try {
-        // Signal the wakeLoop to check for termination.
-        wakeLoop.signal();
-      } finally {
-        lock.unlock();
-      }
-    }
-  }
-
-  public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
-    obsoletedInputs.add(srcAttemptIdentifier);
-    // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
-  }
-
-  /////////////////// End of Methods for InputEventHandler
-  /////////////////// Methods from FetcherCallbackHandler
-  
-  @Override
-  public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
-      FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
-      throws IOException {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-
-    // Count irrespective of whether this is a copy of an already fetched input
-    lock.lock();
-    try {
-      lastProgressTime = System.currentTimeMillis();
-    } finally {
-      lock.unlock();
-    }
-    
-    boolean committed = false;
-    if (!completedInputSet.contains(inputIdentifier)) {
-      synchronized (completedInputSet) {
-        if (!completedInputSet.contains(inputIdentifier)) {
-          fetchedInput.commit();
-          committed = true;
-          logIndividualFetchComplete(copyDuration, fetchedBytes, decompressedLength, fetchedInput,
-              srcAttemptIdentifier);
-
-          // Processing counters for completed and commit fetches only. Need
-          // additional counters for excessive fetches - which primarily comes
-          // in after speculation or retries.
-          shuffledInputsCounter.increment(1);
-          bytesShuffledCounter.increment(fetchedBytes);
-          if (fetchedInput.getType() == Type.MEMORY) {
-            bytesShuffledToMemCounter.increment(fetchedBytes);
-          } else if (fetchedInput.getType() == Type.DISK) {
-            bytesShuffledToDiskCounter.increment(fetchedBytes);
-          } else if (fetchedInput.getType() == Type.DISK_DIRECT) {
-            bytesShuffledDirectDiskCounter.increment(fetchedBytes);
-          }
-          decompressedDataSizeCounter.increment(decompressedLength);
-
-          registerCompletedInput(fetchedInput);
-          lock.lock();
-          try {
-            totalBytesShuffledTillNow += fetchedBytes;
-          } finally {
-            lock.unlock();
-          }
-          logProgress();
-        }
-      }
-    }
-    if (!committed) {
-      fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
-    } else {
-      lock.lock();
-      try {
-        // Signal the wakeLoop to check for termination.
-        wakeLoop.signal();
-      } finally {
-        lock.unlock();
-      }
-    }
-    // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
-  }
-
-  @Override
-  public void fetchFailed(String host,
-      InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
-    // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
-    // For now, reporting immediately.
-    LOG.info("Fetch failed for src: " + srcAttemptIdentifier
-        + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
-        + connectFailed);
-    failedShufflesCounter.increment(1);
-    if (srcAttemptIdentifier == null) {
-      String message = "Received fetchFailure for an unknown src (null)";
-      LOG.fatal(message);
-      inputContext.fatalError(null, message);
-    } else {
-    InputReadErrorEvent readError = InputReadErrorEvent.create(
-        "Fetch failure while fetching from "
-            + TezRuntimeUtils.getTaskAttemptIdentifier(
-            inputContext.getSourceVertexName(),
-            srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
-            srcAttemptIdentifier.getAttemptNumber()),
-        srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
-        srcAttemptIdentifier.getAttemptNumber());
-    
-    List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
-    failedEvents.add(readError);
-    inputContext.sendEvents(failedEvents);
-    }
-  }
-  /////////////////// End of Methods from FetcherCallbackHandler
-
-  public void shutdown() throws InterruptedException {
-    if (!isShutdown.getAndSet(true)) {
-      // Shut down any pending fetchers
-      LOG.info("Shutting down pending fetchers on source" + srcNameTrimmed + ": "
-          + runningFetchers.size());
-      lock.lock();
-      try {
-        wakeLoop.signal(); // signal the fetch-scheduler
-        for (Fetcher fetcher : runningFetchers) {
-          fetcher.shutdown(); // This could be parallelized.
-        }
-      } finally {
-        lock.unlock();
-      }
-
-      if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
-        this.schedulerExecutor.shutdownNow();
-      }
-      if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
-        this.fetcherExecutor.shutdownNow(); // Interrupts all running fetchers.
-      }
-    }
-    //All threads are shutdown.  It is safe to shutdown SSL factory
-    if (httpConnectionParams.isSSLShuffleEnabled()) {
-      HttpConnection.cleanupSSLFactory();
-    }
-  }
-
-  private void registerCompletedInput(FetchedInput fetchedInput) {
-    lock.lock();
-    try {
-      completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
-      completedInputs.add(fetchedInput);
-      if (!inputReadyNotificationSent.getAndSet(true)) {
-        // TODO Should eventually be controlled by Inputs which are processing the data.
-        inputContext.inputIsReady();
-      }
-      int numComplete = numCompletedInputs.incrementAndGet();
-      if (numComplete == numInputs) {
-        LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-  
-  /////////////////// Methods for walking the available inputs
-  
-  /**
-   * @return true if there is another input ready for consumption.
-   */
-  public boolean newInputAvailable() {
-    FetchedInput head = completedInputs.peek();
-    if (head == null || head instanceof NullFetchedInput) {
-      return false;
-    } else {
-      return true;
-    }
-  }
-
-  /**
-   * @return true if all of the required inputs have been fetched.
-   */
-  public boolean allInputsFetched() {
-    lock.lock();
-    try {
-      return numCompletedInputs.get() == numInputs;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * @return the next available input, or null if there are no available inputs.
-   *         This method will block if there are currently no available inputs,
-   *         but more may become available.
-   */
-  public FetchedInput getNextInput() throws InterruptedException {
-    FetchedInput input = null;
-    do {
-      // Check for no additional inputs
-      lock.lock();
-      try {
-        input = completedInputs.peek();
-        if (input == null && allInputsFetched()) {
-          break;
-        }
-      } finally {
-        lock.unlock();
-      }
-      input = completedInputs.take(); // block
-    } while (input instanceof NullFetchedInput);
-    return input;
-  }
-  /////////////////// End of methods for walking the available inputs
-
-
-  /**
-   * Fake input that is added to the completed input list in case an input does not have any data.
-   *
-   */
-  private class NullFetchedInput extends FetchedInput {
-
-    public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
-      super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
-    }
-
-    @Override
-    public OutputStream getOutputStream() throws IOException {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-
-    @Override
-    public InputStream getInputStream() throws IOException {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-
-    @Override
-    public void commit() throws IOException {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-
-    @Override
-    public void abort() throws IOException {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-
-    @Override
-    public void free() {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-  }
-
-  private void logProgress() {
-    double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
-    int inputsDone = numInputs - numCompletedInputs.get();
-    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
-
-    double transferRate = mbs / secsSinceStart;
-    LOG.info("copy(" + inputsDone + " of " + numInputs +
-        ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
-        + mbpsFormat.format(transferRate) + " MB/s)");
-  }
-
-  private void logIndividualFetchComplete(long millis, long fetchedBytes, long decompressedLength,
-                                          FetchedInput fetchedInput,
-                                          InputAttemptIdentifier srcAttemptIdentifier) {
-    double rate = 0;
-    if (millis != 0) {
-      rate = fetchedBytes / ((double) millis / 1000);
-      rate = rate / (1024 * 1024);
-    }
-
-    LOG.info(
-        "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType() +
-            ", CompressedSize=" + fetchedBytes + ", DecompressedSize=" + decompressedLength +
-            ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
-            mbpsFormat.format(rate) + " MB/s");
-  }
-  
-  private class SchedulerFutureCallback implements FutureCallback<Void> {
-
-    @Override
-    public void onSuccess(Void result) {
-      LOG.info("Scheduler thread completed");
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      if (isShutdown.get()) {
-        LOG.info("Already shutdown. Ignoring error: " + t);
-      } else {
-        LOG.error("Scheduler failed with error: ", t);
-        inputContext.fatalError(t, "Shuffle Scheduler Failed");
-      }
-    }
-    
-  }
-  
-  private class FetchFutureCallback implements FutureCallback<FetchResult> {
-
-    private final Fetcher fetcher;
-    
-    public FetchFutureCallback(Fetcher fetcher) {
-      this.fetcher = fetcher;
-    }
-    
-    private void doBookKeepingForFetcherComplete() {
-      lock.lock();
-      try {
-        runningFetchers.remove(fetcher);
-        wakeLoop.signal();
-      } finally {
-        lock.unlock();
-      }
-    }
-    
-    @Override
-    public void onSuccess(FetchResult result) {
-      fetcher.shutdown();
-      if (isShutdown.get()) {
-        LOG.info("Already shutdown. Ignoring event from fetcher");
-      } else {
-        Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
-        if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
-          InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(), result.getPort()));
-          assert inputHost != null;
-          for (InputAttemptIdentifier input : pendingInputs) {
-            inputHost.addKnownInput(input);
-          }
-          pendingHosts.add(inputHost);
-        }
-        doBookKeepingForFetcherComplete();
-      }
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      // Unsuccessful - the fetcher may not have shutdown correctly. Try shutting it down.
-      fetcher.shutdown();
-      if (isShutdown.get()) {
-        LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
-      } else {
-        LOG.error("Fetcher failed with error: ", t);
-        shuffleError = t;
-        inputContext.fatalError(t, "Fetch failed");
-        doBookKeepingForFetcherComplete();
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
deleted file mode 100644
index 20ee665..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.shuffle.common.impl;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.tez.common.TezRuntimeFrameworkConfigs;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.Constants;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
-import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
-import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
-
-
-/**
- * Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
- *
- */
-@Private
-public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
-    FetchedInputCallback {
-
-  private static final Log LOG = LogFactory.getLog(SimpleFetchedInputAllocator.class);
-  
-  private final Configuration conf;
-
-  private final TezTaskOutputFiles fileNameAllocator;
-  private final LocalDirAllocator localDirAllocator;
-
-  // Configuration parameters
-  private final long memoryLimit;
-  private final long maxSingleShuffleLimit;
-
-  private final long maxAvailableTaskMemory;
-  private final long initialMemoryAvailable;
-  
-  private volatile long usedMemory = 0;
-
-  public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf,
-      long maxTaskAvailableMemory, long memoryAvailable) {
-    this.conf = conf;    
-    this.maxAvailableTaskMemory = maxTaskAvailableMemory;
-    this.initialMemoryAvailable = memoryAvailable;
-    
-    this.fileNameAllocator = new TezTaskOutputFiles(conf,
-        uniqueIdentifier);
-    this.localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
-    
-    // Setup configuration
-    final float maxInMemCopyUse = conf.getFloat(
-        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
-        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
-    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
-      throw new IllegalArgumentException("Invalid value for "
-          + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": "
-          + maxInMemCopyUse);
-    }
-    
-    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
-        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
-    
-    if (memReq <= this.initialMemoryAvailable) {
-      this.memoryLimit = memReq;
-    } else {
-      this.memoryLimit = initialMemoryAvailable;
-    }
-
-    LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
-    
-    final float singleShuffleMemoryLimitPercent = conf.getFloat(
-        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
-        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
-    if (singleShuffleMemoryLimitPercent <= 0.0f
-        || singleShuffleMemoryLimitPercent > 1.0f) {
-      throw new IllegalArgumentException("Invalid value for "
-          + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
-          + singleShuffleMemoryLimitPercent);
-    }
-
-    this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
-    
-    LOG.info("SimpleInputManager -> " + "MemoryLimit: " + 
-        this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
-  }
-
-  @Private
-  public static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
-    final float maxInMemCopyUse = conf.getFloat(
-        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
-        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
-    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
-      throw new IllegalArgumentException("Invalid value for "
-          + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": "
-          + maxInMemCopyUse);
-    }
-    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
-        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
-    return memReq;
-  }
-
-  @Override
-  public synchronized FetchedInput allocate(long actualSize, long compressedSize,
-      InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
-    if (actualSize > maxSingleShuffleLimit
-        || this.usedMemory + actualSize > this.memoryLimit) {
-      return new DiskFetchedInput(actualSize, compressedSize,
-          inputAttemptIdentifier, this, conf, localDirAllocator,
-          fileNameAllocator);
-    } else {
-      this.usedMemory += actualSize;
-      LOG.info("Used memory after allocating " + actualSize  + " : " + usedMemory);
-      return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this);
-    }
-  }
-
-  @Override
-  public synchronized FetchedInput allocateType(Type type, long actualSize,
-      long compressedSize, InputAttemptIdentifier inputAttemptIdentifier)
-      throws IOException {
-
-    switch (type) {
-    case DISK:
-      return new DiskFetchedInput(actualSize, compressedSize,
-          inputAttemptIdentifier, this, conf, localDirAllocator,
-          fileNameAllocator);
-    default:
-      return allocate(actualSize, compressedSize, inputAttemptIdentifier);
-    }
-  }
-
-  @Override
-  public synchronized void fetchComplete(FetchedInput fetchedInput) {
-    switch (fetchedInput.getType()) {
-    // Not tracking anything here.
-    case DISK:
-    case DISK_DIRECT:
-    case MEMORY:
-      break;
-    default:
-      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
-          + " not expected for Broadcast fetch");
-    }
-  }
-
-  @Override
-  public synchronized void fetchFailed(FetchedInput fetchedInput) {
-    cleanup(fetchedInput);
-  }
-
-  @Override
-  public synchronized void freeResources(FetchedInput fetchedInput) {
-    cleanup(fetchedInput);
-  }
-
-  private void cleanup(FetchedInput fetchedInput) {
-    switch (fetchedInput.getType()) {
-    case DISK:
-      break;
-    case MEMORY:
-      unreserve(fetchedInput.getActualSize());
-      break;
-    default:
-      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
-          + " not expected for Broadcast fetch");
-    }
-  }
-
-  private synchronized void unreserve(long size) {
-    this.usedMemory -= size;
-    LOG.info("Used memory after freeing " + size  + " : " + usedMemory);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/package-info.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/package-info.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/package-info.java
deleted file mode 100644
index b69472e..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-@Private
-package org.apache.tez.runtime.library.shuffle.common.impl;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/package-info.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/package-info.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/package-info.java
deleted file mode 100644
index 349c648..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-@Private
-package org.apache.tez.runtime.library.shuffle.common;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index 9ff4f44..7d48056 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -34,9 +34,9 @@ import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
 import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
-import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
-import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryWriter;
-import org.apache.tez.runtime.library.common.shuffle.impl.MergeManager;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
new file mode 100644
index 0000000..5cee83a
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.EnvironmentUpdateUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.FetchResult;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.Fetcher;
+import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
+import org.apache.tez.runtime.library.common.shuffle.LocalDiskFetchedInput;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestFetcher {
+  private static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
+  private static String HOST = "localhost";
+  private static int PORT = 0;
+
+  @Test(timeout = 3000)
+  public void testLocalFetchModeSetting() throws Exception {
+    TezConfiguration conf = new TezConfiguration();
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true");
+    EnvironmentUpdateUtils.put(ApplicationConstants.Environment.NM_HOST.toString(), HOST);
+    InputAttemptIdentifier[] srcAttempts = {
+        new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1")
+    };
+    FetcherCallback fetcherCallback = mock(FetcherCallback.class);
+
+    Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
+        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true);
+    builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
+    Fetcher fetcher = spy(builder.build());
+
+    FetchResult fr = new FetchResult(HOST, PORT, 0, Arrays.asList(srcAttempts));
+    Fetcher.HostFetchResult hfr = new Fetcher.HostFetchResult(fr, srcAttempts, false);
+    doReturn(hfr).when(fetcher).setupLocalDiskFetch();
+    doReturn(null).when(fetcher).doHttpFetch();
+    doNothing().when(fetcher).shutdown();
+
+    fetcher.call();
+
+    verify(fetcher).setupLocalDiskFetch();
+    verify(fetcher, never()).doHttpFetch();
+
+    // When disabled use http fetch
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "false");
+    builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
+        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, false);
+    builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
+    fetcher = spy(builder.build());
+
+    doReturn(null).when(fetcher).setupLocalDiskFetch();
+    doReturn(hfr).when(fetcher).doHttpFetch();
+    doNothing().when(fetcher).shutdown();
+
+    fetcher.call();
+
+    verify(fetcher, never()).setupLocalDiskFetch();
+    verify(fetcher).doHttpFetch();
+  }
+
+  @Test(timeout = 3000)
+  public void testSetupLocalDiskFetch() throws Exception {
+
+    InputAttemptIdentifier[] srcAttempts = {
+        new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
+        new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
+        new InputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2"),
+        new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3"),
+        new InputAttemptIdentifier(4, 5, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4")
+    };
+    final int FIRST_FAILED_ATTEMPT_IDX = 2;
+    final int SECOND_FAILED_ATTEMPT_IDX = 4;
+    final int[] sucessfulAttempts = {0, 1, 3};
+
+    TezConfiguration conf = new TezConfiguration();
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true");
+    EnvironmentUpdateUtils.put(ApplicationConstants.Environment.NM_HOST.toString(), HOST);
+    int partition = 42;
+    FetcherCallback callback = mock(FetcherCallback.class);
+    Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
+        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true);
+    builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
+    Fetcher fetcher = spy(builder.build());
+
+    doAnswer(new Answer<Path>() {
+      @Override
+      public Path answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]);
+      }
+    }).when(fetcher).getShuffleInputFileName(anyString(), anyString());
+
+    doAnswer(new Answer<TezIndexRecord>() {
+      @Override
+      public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        InputAttemptIdentifier srcAttemptId = (InputAttemptIdentifier) args[0];
+        String pathComponent = srcAttemptId.getPathComponent();
+        int len = pathComponent.length();
+        long p = Long.valueOf(pathComponent.substring(len - 1, len));
+        // Fail the 3rd one and 5th one.
+        if (p == FIRST_FAILED_ATTEMPT_IDX || p == SECOND_FAILED_ATTEMPT_IDX) {
+          throw new IOException("failing on 3/5th input to simulate failure case");
+        }
+        // match with params for copySucceeded below.
+        return new TezIndexRecord(p * 10, p * 1000, p * 100);
+      }
+    }).when(fetcher).getTezIndexRecord(any(InputAttemptIdentifier.class));
+
+    doNothing().when(fetcher).shutdown();
+    doNothing().when(callback).fetchSucceeded(anyString(), any(InputAttemptIdentifier.class),
+        any(FetchedInput.class), anyLong(), anyLong(), anyLong());
+    doNothing().when(callback).fetchFailed(anyString(), any(InputAttemptIdentifier.class), eq(false));
+
+    FetchResult fetchResult = fetcher.call();
+
+    verify(fetcher).setupLocalDiskFetch();
+
+    // expect 3 sucesses and 2 failures
+    for (int i : sucessfulAttempts) {
+      verifyFetchSucceeded(callback, srcAttempts[i], conf);
+    }
+    verify(callback).fetchFailed(eq(HOST), eq(srcAttempts[FIRST_FAILED_ATTEMPT_IDX]), eq(false));
+    verify(callback).fetchFailed(eq(HOST), eq(srcAttempts[SECOND_FAILED_ATTEMPT_IDX]), eq(false));
+
+    Assert.assertEquals("fetchResult host", fetchResult.getHost(), HOST);
+    Assert.assertEquals("fetchResult partition", fetchResult.getPartition(), partition);
+    Assert.assertEquals("fetchResult port", fetchResult.getPort(), PORT);
+
+    // 3nd and 5th attempt failed
+    List<InputAttemptIdentifier> pendingInputs = Lists.newArrayList(fetchResult.getPendingInputs());
+    Assert.assertEquals("fetchResult pendingInput size", pendingInputs.size(), 2);
+    Assert.assertEquals("fetchResult failed attempt", pendingInputs.get(0),
+        srcAttempts[FIRST_FAILED_ATTEMPT_IDX]);
+    Assert.assertEquals("fetchResult failed attempt", pendingInputs.get(1),
+        srcAttempts[SECOND_FAILED_ATTEMPT_IDX]);
+  }
+
+  protected void verifyFetchSucceeded(FetcherCallback callback, InputAttemptIdentifier srcAttempId, Configuration conf) throws IOException {
+    String pathComponent = srcAttempId.getPathComponent();
+    int len = pathComponent.length();
+    long p = Long.valueOf(pathComponent.substring(len - 1, len));
+    ArgumentCaptor<LocalDiskFetchedInput> capturedFetchedInput =
+        ArgumentCaptor.forClass(LocalDiskFetchedInput.class);
+    verify(callback)
+        .fetchSucceeded(eq(HOST), eq(srcAttempId), capturedFetchedInput.capture(), eq(p * 100),
+            eq(p * 1000), anyLong());
+    LocalDiskFetchedInput f = capturedFetchedInput.getValue();
+    Assert.assertEquals("success callback filename", f.getInputFile().toString(),
+        SHUFFLE_INPUT_FILE_PREFIX + pathComponent);
+    Assert.assertTrue("success callback fs", f.getLocalFS() instanceof LocalFileSystem);
+    Assert.assertEquals("success callback filesystem", f.getStartOffset(), p * 10);
+    Assert.assertEquals("success callback raw size", f.getActualSize(), p * 1000);
+    Assert.assertEquals("success callback compressed size", f.getCompressedSize(), p * 100);
+    Assert.assertEquals("success callback input id", f.getInputAttemptIdentifier(), srcAttempId);
+    Assert.assertEquals("success callback type", f.getType(), FetchedInput.Type.DISK_DIRECT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestFetcher.java
deleted file mode 100644
index e97ff91..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestFetcher.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
-import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
-import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
-import org.apache.tez.runtime.library.shuffle.common.HttpConnection;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import javax.crypto.SecretKey;
-
-public class TestFetcher {
-
-  public static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
-  public static final String HOST = "localhost";
-  public static final int PORT = 0;
-
-  static final Log LOG = LogFactory.getLog(TestFetcher.class);
-
-  @Test(timeout = 5000)
-  public void testSetupLocalDiskFetch() throws Exception {
-    Configuration conf = new TezConfiguration();
-    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
-    MergeManager merger = mock(MergeManager.class);
-    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
-    Shuffle shuffle = mock(Shuffle.class);
-    InputContext inputContext = mock(InputContext.class);
-    when(inputContext.getCounters()).thenReturn(new TezCounters());
-    when(inputContext.getSourceVertexName()).thenReturn("");
-
-    Fetcher fetcher = new Fetcher(null, scheduler, merger, metrics, shuffle, null,
-        false, 0, null, inputContext, conf, true);
-    Fetcher spyFetcher = spy(fetcher);
-
-    MapHost host = new MapHost(1, HOST + ":" + PORT,
-        "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
-    List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
-        new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
-        new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
-        new InputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2"),
-        new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3"),
-        new InputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4")
-    );
-    final int FIRST_FAILED_ATTEMPT_IDX = 2;
-    final int SECOND_FAILED_ATTEMPT_IDX = 4;
-    final int[] sucessfulAttemptsIndexes = { 0, 1, 3 };
-
-    doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
-
-    doAnswer(new Answer<MapOutput>() {
-      @Override
-      public MapOutput answer(InvocationOnMock invocation) throws Throwable {
-        Object[] args = invocation.getArguments();
-        MapOutput mapOutput = mock(MapOutput.class);
-        doReturn(MapOutput.Type.DISK_DIRECT).when(mapOutput).getType();
-        doReturn(args[0]).when(mapOutput).getAttemptIdentifier();
-        return mapOutput;
-      }
-    }).when(spyFetcher)
-        .getMapOutputForDirectDiskFetch(any(InputAttemptIdentifier.class), any(Path.class),
-            any(TezIndexRecord.class));
-
-    doAnswer(new Answer<Path>() {
-      @Override
-      public Path answer(InvocationOnMock invocation) throws Throwable {
-        Object[] args = invocation.getArguments();
-        return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]);
-      }
-    }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString());
-
-    doAnswer(new Answer<TezIndexRecord>() {
-      @Override
-      public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
-        Object[] args = invocation.getArguments();
-        String pathComponent = (String) args[0];
-        int len = pathComponent.length();
-        long p = Long.valueOf(pathComponent.substring(len - 1, len));
-        if (p == FIRST_FAILED_ATTEMPT_IDX || p == SECOND_FAILED_ATTEMPT_IDX) {
-          throw new IOException("failing to simulate failure case");
-        }
-        // match with params for copySucceeded below.
-        return new TezIndexRecord(p * 10, p * 1000, p * 100);
-      }
-    }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId()));
-
-    doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
-        anyLong(), anyLong(), anyLong(), any(MapOutput.class));
-    doNothing().when(scheduler).putBackKnownMapOutput(host,
-        srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
-    doNothing().when(scheduler).putBackKnownMapOutput(host,
-        srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
-
-    spyFetcher.setupLocalDiskFetch(host);
-
-    // should have exactly 3 success and 1 failure.
-    for (int i : sucessfulAttemptsIndexes) {
-      verifyCopySucceeded(scheduler, host, srcAttempts, i);
-    }
-    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false);
-    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false);
-
-    verify(metrics, times(3)).successFetch();
-    verify(metrics, times(2)).failedFetch();
-
-    verify(spyFetcher).putBackRemainingMapOutputs(host);
-    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
-    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
-  }
-
-  private void verifyCopySucceeded(ShuffleScheduler scheduler, MapHost host,
-      List<InputAttemptIdentifier> srcAttempts, long p) throws
-      IOException {
-    // need to verify filename, offsets, sizes wherever they are used.
-    InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) p);
-    String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent();
-    ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class);
-    verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 100),
-        eq(p * 1000), anyLong(), captureMapOutput.capture());
-
-    // cannot use the equals of MapOutput as it compares id which is private. so doing it manually
-    MapOutput m = captureMapOutput.getAllValues().get(0);
-    Assert.assertTrue(m.getType().equals(MapOutput.Type.DISK_DIRECT) &&
-        m.getAttemptIdentifier().equals(srcAttemptToMatch));
-  }
-
-  static class FakeHttpConnection extends HttpConnection {
-
-    public FakeHttpConnection(URL url,
-        HttpConnectionParams connParams, String logIdentifier, SecretKey jobTokenSecret)
-        throws IOException {
-      super(url, connParams, logIdentifier, jobTokenSecret);
-      this.connection = mock(HttpURLConnection.class);
-      when(connection.getResponseCode()).thenReturn(200);
-      when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
-          .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
-          .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-      when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn("");
-    }
-
-    public DataInputStream getInputStream() throws IOException {
-      byte[] b = new byte[1024];
-      ByteArrayInputStream bin = new ByteArrayInputStream(b);
-      return new DataInputStream(bin);
-    }
-
-    public void validate() throws IOException {
-      //empty
-    }
-
-    public void cleanup(boolean disconnect) throws IOException {
-      LOG.info("HttpConnection cleanup called with disconnect=" + disconnect);
-      //ignore
-    }
-  }
-
-  @Test
-  public void testWithRetry() throws Exception {
-    Configuration conf = new TezConfiguration();
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3000);
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3000);
-    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
-    MergeManager merger = mock(MergeManager.class);
-
-    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
-    Shuffle shuffle = mock(Shuffle.class);
-    InputContext inputContext = mock(InputContext.class);
-    when(inputContext.getCounters()).thenReturn(new TezCounters());
-    when(inputContext.getSourceVertexName()).thenReturn("");
-
-    HttpConnection.HttpConnectionParams httpConnectionParams =
-        ShuffleUtils.constructHttpShuffleConnectionParams(conf);
-    Fetcher mockFetcher =
-        new Fetcher(httpConnectionParams, scheduler, merger, metrics, shuffle, null,
-            false, 0, null, inputContext, conf, false);
-    final Fetcher fetcher = spy(mockFetcher);
-
-    final MapHost host = new MapHost(1, HOST + ":" + PORT,
-        "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
-    final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
-        new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
-        new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
-        new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3")
-    );
-    doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
-    doReturn(true).when(fetcher).setupConnection(host, srcAttempts);
-
-    URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), srcAttempts, false);
-    fetcher.httpConnection = new FakeHttpConnection(url, null, "", null);
-
-    doAnswer(new Answer<MapOutput>() {
-      @Override
-      public MapOutput answer(InvocationOnMock invocation) throws Throwable {
-        Object[] args = invocation.getArguments();
-        MapOutput mapOutput = mock(MapOutput.class);
-        doReturn(MapOutput.Type.MEMORY).when(mapOutput).getType();
-        doReturn(args[0]).when(mapOutput).getAttemptIdentifier();
-        return mapOutput;
-      }
-    }).when(merger).reserve(any(InputAttemptIdentifier.class), anyInt(), anyInt(), anyInt());
-
-    //Create read timeout when reading data
-    doAnswer(new Answer<Void>() {
-      @Override public Void answer(InvocationOnMock invocation) throws Throwable {
-        // Emulate host down for 4 seconds.
-        Thread.sleep(4000);
-        doReturn(false).when(fetcher).setupConnection(host, srcAttempts);
-
-        // Throw IOException when fetcher tries to connect again to the same node
-        throw new FetcherReadTimeoutException("creating fetcher socket read timeout exception");
-      }
-    }).when(fetcher).copyMapOutput(any(MapHost.class), any(DataInputStream.class));
-
-    try {
-      fetcher.copyFromHost(host);
-    } catch(IOException e) {
-      //ignore
-    }
-    //setup connection should be called twice (1 for connect and another for retry)
-    verify(fetcher, times(2)).setupConnection(any(MapHost.class), anyList());
-    //since copyMapOutput consistently fails, it should call copyFailed once
-    verify(scheduler, times(1)).copyFailed(any(InputAttemptIdentifier.class), any(MapHost.class),
-          anyBoolean(), anyBoolean());
-
-    verify(fetcher, times(1)).putBackRemainingMapOutputs(any(MapHost.class));
-    verify(scheduler, times(3)).putBackKnownMapOutput(any(MapHost.class),
-        any(InputAttemptIdentifier.class));
-
-
-    //Verify by stopping the fetcher abruptly
-    try {
-      fetcher.stopped = false; // flag to indicate fetcher stopped
-      fetcher.copyFromHost(host);
-      verify(fetcher, times(2)).putBackRemainingMapOutputs(any(MapHost.class));
-    } catch(IOException e) {
-      //ignore
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
deleted file mode 100644
index c969cf1..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandler.java
+++ /dev/null
@@ -1,173 +0,0 @@
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import com.google.protobuf.ByteString;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-public class TestShuffleInputEventHandler {
-
-  private static final String HOST = "localhost";
-  private static final int PORT = 8080;
-  private static final String PATH_COMPONENT = "attempt";
-
-  private ShuffleInputEventHandler handler;
-  private ShuffleScheduler scheduler;
-
-  private InputContext createTezInputContext() {
-    ApplicationId applicationId = ApplicationId.newInstance(1, 1);
-    InputContext inputContext = mock(InputContext.class);
-    doReturn(applicationId).when(inputContext).getApplicationId();
-    return inputContext;
-  }
-
-  private Event createDataMovementEvent(int srcIndex, int targetIndex,
-      ByteString emptyPartitionByteString, boolean allPartitionsEmpty) {
-    ShuffleUserPayloads.DataMovementEventPayloadProto.Builder builder =
-        ShuffleUserPayloads.DataMovementEventPayloadProto
-            .newBuilder();
-    if (!allPartitionsEmpty) {
-      builder.setHost(HOST);
-      builder.setPort(PORT);
-      builder.setPathComponent(PATH_COMPONENT);
-    }
-    builder.setRunDuration(10);
-    if (emptyPartitionByteString != null) {
-      builder.setEmptyPartitions(emptyPartitionByteString);
-    }
-    return DataMovementEvent
-        .create(srcIndex, targetIndex, 0, builder.build().toByteString().asReadOnlyByteBuffer());
-  }
-
-  @Before
-  public void setup() throws Exception {
-    InputContext inputContext = createTezInputContext();
-    scheduler = mock(ShuffleScheduler.class);
-    handler = new ShuffleInputEventHandler(inputContext, scheduler, false);
-  }
-
-  @Test
-  public void basicTest() throws IOException {
-    List<Event> events = new LinkedList<Event>();
-    int srcIdx = 0;
-    int targetIdx = 1;
-    Event dme = createDataMovementEvent(srcIdx, targetIdx, null, false);
-    events.add(dme);
-    handler.handleEvents(events);
-    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0,
-        PATH_COMPONENT);
-
-    String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
-    int partitionId = srcIdx;
-    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId),
-        eq(baseUri), eq(expectedIdentifier));
-  }
-
-  @Test
-  public void testFailedEvent() throws IOException {
-    List<Event> events = new LinkedList<Event>();
-    int targetIdx = 1;
-    InputFailedEvent failedEvent = InputFailedEvent.create(targetIdx, 0);
-    events.add(failedEvent);
-    handler.handleEvents(events);
-    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
-    verify(scheduler).obsoleteInput(eq(expectedIdentifier));
-  }
-
-  @Test
-  public void testAllPartitionsEmpty() throws IOException {
-    List<Event> events = new LinkedList<Event>();
-    int srcIdx = 0;
-    int targetIdx = 1;
-    Event dme = createDataMovementEvent(srcIdx, targetIdx, createEmptyPartitionByteString(srcIdx)
-        , true);
-    events.add(dme);
-    handler.handleEvents(events);
-
-    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
-    verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l),
-        eq(0l), eq(0l), any(MapOutput.class));
-  }
-
-  @Test
-  public void testCurrentPartitionEmpty() throws IOException {
-    List<Event> events = new LinkedList<Event>();
-    int srcIdx = 0;
-    int targetIdx = 1;
-    Event dme = createDataMovementEvent(srcIdx, targetIdx, createEmptyPartitionByteString(srcIdx)
-        , false);
-    events.add(dme);
-    handler.handleEvents(events);
-
-    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
-
-    verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l),
-        eq(0l), eq(0l), any(MapOutput.class));
-  }
-
-  @Test
-  public void testOtherPartitionEmpty() throws IOException {
-    List<Event> events = new LinkedList<Event>();
-    int srcIdx = 0;
-    int taskIndex = 1;
-    Event dme = createDataMovementEvent(srcIdx, taskIndex, createEmptyPartitionByteString(100),
-        false);
-    events.add(dme);
-    handler.handleEvents(events);
-
-    String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
-    int partitionId = srcIdx;
-    InputAttemptIdentifier expectedIdentifier =
-        new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT);
-
-    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri),
-        eq(expectedIdentifier));
-  }
-
-  private ByteString createEmptyPartitionByteString(int... emptyPartitions) throws IOException {
-    BitSet bitSet = new BitSet();
-    for (int i : emptyPartitions) {
-      bitSet.set(i);
-    }
-    return TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(bitSet));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
new file mode 100644
index 0000000..57d5e2c
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+
+public class TestShuffleInputEventHandlerImpl {
+
+  private static final String HOST = "localhost";
+  private static final int PORT = 8080;
+  private static final String PATH_COMPONENT = "attempttmp";
+
+  @Test
+  public void testSimple() throws IOException {
+    InputContext inputContext = mock(InputContext.class);
+    ShuffleManager shuffleManager = mock(ShuffleManager.class);
+    FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
+
+    ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
+        shuffleManager, inputAllocator, null, false, 0);
+
+    int taskIndex = 1;
+    Event dme = createDataMovementEvent(0, taskIndex, null);
+
+    List<Event> eventList = new LinkedList<Event>();
+    eventList.add(dme);
+    handler.handleEvents(eventList);
+
+    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(taskIndex, 0,
+        PATH_COMPONENT);
+
+    verify(shuffleManager).addKnownInput(eq(HOST), eq(PORT), eq(expectedIdentifier), eq(0));
+  }
+
+  @Test
+  public void testCurrentPartitionEmpty() throws IOException {
+    InputContext inputContext = mock(InputContext.class);
+    ShuffleManager shuffleManager = mock(ShuffleManager.class);
+    FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
+
+    ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
+        shuffleManager, inputAllocator, null, false, 0);
+
+    int taskIndex = 1;
+    Event dme = createDataMovementEvent(0, taskIndex, createEmptyPartitionByteString(0));
+
+    List<Event> eventList = new LinkedList<Event>();
+    eventList.add(dme);
+    handler.handleEvents(eventList);
+
+    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(taskIndex, 0);
+
+    verify(shuffleManager).addCompletedInputWithNoData(eq(expectedIdentifier));
+  }
+
+  @Test
+  public void testOtherPartitionEmpty() throws IOException {
+    InputContext inputContext = mock(InputContext.class);
+    ShuffleManager shuffleManager = mock(ShuffleManager.class);
+    FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
+
+    ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
+        shuffleManager, inputAllocator, null, false, 0);
+
+    int taskIndex = 1;
+    Event dme = createDataMovementEvent(0, taskIndex, createEmptyPartitionByteString(1));
+    List<Event> eventList = new LinkedList<Event>();
+    eventList.add(dme);
+    handler.handleEvents(eventList);
+
+    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT);
+
+    verify(shuffleManager).addKnownInput(eq(HOST), eq(PORT), eq(expectedIdentifier), eq(0));
+  }
+
+  @Test
+  public void testMultipleEvents1() throws IOException {
+    InputContext inputContext = mock(InputContext.class);
+    ShuffleManager shuffleManager = mock(ShuffleManager.class);
+    FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
+
+    ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext,
+        shuffleManager, inputAllocator, null, false, 0);
+
+    int taskIndex1 = 1;
+    Event dme1 = createDataMovementEvent(0, taskIndex1, createEmptyPartitionByteString(0));
+    int taskIndex2 = 2;
+    Event dme2 = createDataMovementEvent(0, taskIndex2, null);
+    
+    List<Event> eventList = new LinkedList<Event>();
+    eventList.add(dme1);
+    eventList.add(dme2);
+    handler.handleEvents(eventList);
+
+    InputAttemptIdentifier expectedIdentifier1 = new InputAttemptIdentifier(taskIndex1, 0);
+    InputAttemptIdentifier expectedIdentifier2 = new InputAttemptIdentifier(taskIndex2, 0, PATH_COMPONENT);
+
+    verify(shuffleManager).addCompletedInputWithNoData(eq(expectedIdentifier1));
+    verify(shuffleManager).addKnownInput(eq(HOST), eq(PORT), eq(expectedIdentifier2), eq(0));
+  }
+  
+  private Event createDataMovementEvent(int srcIndex, int targetIndex,
+      ByteString emptyPartitionByteString) {
+    DataMovementEventPayloadProto.Builder builder = DataMovementEventPayloadProto.newBuilder();
+    builder.setHost(HOST);
+    builder.setPort(PORT);
+    builder.setPathComponent("attempttmp");
+    if (emptyPartitionByteString != null) {
+      builder.setEmptyPartitions(emptyPartitionByteString);
+    }
+    Event dme = DataMovementEvent
+        .create(srcIndex, targetIndex, 0, builder.build().toByteString().asReadOnlyByteBuffer());
+    return dme;
+  }
+
+  private ByteString createEmptyPartitionByteString(int... emptyPartitions) throws IOException {
+    BitSet bitSet = new BitSet();
+    for (int i : emptyPartitions) {
+      bitSet.set(i);
+    }
+    ByteString emptyPartitionsBytesString = TezCommonUtils.compressByteArrayToByteString(
+        TezUtilsInternal
+        .toByteArray(bitSet));
+    return emptyPartitionsBytesString;
+  }
+
+}


Mime
View raw message