tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [6/9] TEZ-1479. Disambiguate (refactor) between ShuffleInputEventHandlers and Fetchers. (sseth) (cherry picked from commit 7be5830a908602ff91a07d3020f2dddf7705d48f)
Date Wed, 15 Oct 2014 18:59:42 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
new file mode 100644
index 0000000..ed60280
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -0,0 +1,804 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.text.DecimalFormat;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputIdentifier;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.shuffle.FetchResult;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
+import org.apache.tez.runtime.library.common.shuffle.Fetcher;
+import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
+import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
+import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
+import org.apache.tez.runtime.library.common.shuffle.InputHost;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
+import org.apache.tez.runtime.library.common.shuffle.Fetcher.FetcherBuilder;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+// This only knows how to deal with a single srcIndex for a given targetIndex.
+// In case the src task generates multiple outputs for the same target Index
+// (multiple src-indices), modifications will be required.
+public class ShuffleManager implements FetcherCallback {
+
+  private static final Log LOG = LogFactory.getLog(ShuffleManager.class);
+  
+  private final InputContext inputContext;
+  private final int numInputs;
+
+  private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
+
+  private final FetchedInputAllocator inputManager;
+
+  private final ListeningExecutorService fetcherExecutor;
+
+  private final ListeningExecutorService schedulerExecutor;
+  private final RunShuffleCallable schedulerCallable;
+  
+  private final BlockingQueue<FetchedInput> completedInputs;
+  private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
+  private final Set<InputIdentifier> completedInputSet;
+  private final ConcurrentMap<String, InputHost> knownSrcHosts;
+  private final BlockingQueue<InputHost> pendingHosts;
+  private final Set<InputAttemptIdentifier> obsoletedInputs;
+  private Set<Fetcher> runningFetchers;
+  
+  private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
+  
+  private final long startTime;
+  private long lastProgressTime;
+  private long totalBytesShuffledTillNow;
+
+  // Required to be held when manipulating pendingHosts
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition wakeLoop = lock.newCondition();
+  
+  private final int numFetchers;
+  
+  // Parameters required by Fetchers
+  private final SecretKey shuffleSecret;
+  private final CompressionCodec codec;
+  private final boolean localDiskFetchEnabled;
+  private final boolean sharedFetchEnabled;
+  
+  private final int ifileBufferSize;
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  
+  private final String srcNameTrimmed; 
+  
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+  private final TezCounter shuffledInputsCounter;
+  private final TezCounter failedShufflesCounter;
+  private final TezCounter bytesShuffledCounter;
+  private final TezCounter decompressedDataSizeCounter;
+  private final TezCounter bytesShuffledToDiskCounter;
+  private final TezCounter bytesShuffledToMemCounter;
+  private final TezCounter bytesShuffledDirectDiskCounter;
+  
+  private volatile Throwable shuffleError;
+  private final HttpConnectionParams httpConnectionParams;
+  
+
+  private final LocalDirAllocator localDirAllocator;
+  private final RawLocalFileSystem localFs;
+  private final Path[] localDisks;
+  private final static String localhostName = NetUtils.getHostname();
+
+  // TODO More counters - FetchErrors, speed?
+  
+  public ShuffleManager(InputContext inputContext, Configuration conf, int numInputs,
+      int bufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength,
+      CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException {
+    this.inputContext = inputContext;
+    this.numInputs = numInputs;
+    
+    this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
+    this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
+    this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
+    this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
+    this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
+    this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
+    this.bytesShuffledDirectDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
+  
+    this.ifileBufferSize = bufferSize;
+    this.ifileReadAhead = ifileReadAheadEnabled;
+    this.ifileReadAheadLength = ifileReadAheadLength;
+    this.codec = codec;
+    this.inputManager = inputAllocator;
+    this.localDiskFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
+        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
+    this.sharedFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH,
+        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT);
+    
+    this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
+  
+    completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
+    completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
+    knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
+    pendingHosts = new LinkedBlockingQueue<InputHost>();
+    obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
+    runningFetchers = Collections.newSetFromMap(new ConcurrentHashMap<Fetcher, Boolean>());
+
+    int maxConfiguredFetchers = 
+        conf.getInt(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
+    
+    this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
+    
+    ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
+        numFetchers,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d " + localhostName).build());
+    this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
+    
+    ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("ShuffleRunner [" + srcNameTrimmed + "]").build());
+    this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
+    this.schedulerCallable = new RunShuffleCallable(conf);
+    
+    this.startTime = System.currentTimeMillis();
+    this.lastProgressTime = startTime;
+    
+    this.shuffleSecret = ShuffleUtils
+        .getJobTokenSecretFromTokenBytes(inputContext
+            .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
+    httpConnectionParams =
+        ShuffleUtils.constructHttpShuffleConnectionParams(conf);
+
+    this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
+
+    this.localDirAllocator = new LocalDirAllocator(
+        TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+
+    this.localDisks = Iterables.toArray(
+        localDirAllocator.getAllLocalPathsToRead("", conf), Path.class);
+
+    Arrays.sort(this.localDisks);
+
+    LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
+        + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
+        + numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
+        + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", "
+        + "localDiskFetchEnabled=" + localDiskFetchEnabled + ", "
+        + "sharedFetchEnabled=" + sharedFetchEnabled + ", "
+        + httpConnectionParams.toString());
+  }
+
+  public void run() throws IOException {
+    Preconditions.checkState(inputManager != null, "InputManager must be configured");
+
+    ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
+    Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
+    // Shutdown this executor once this task, and the callback complete.
+    schedulerExecutor.shutdown();
+  }
+  
+  private class RunShuffleCallable implements Callable<Void> {
+
+    private final Configuration conf;
+
+    public RunShuffleCallable(Configuration conf) {
+      this.conf = conf;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
+        lock.lock();
+        try {
+          if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) {
+            if (numCompletedInputs.get() < numInputs) {
+              wakeLoop.await();
+            }
+          }
+        } finally {
+          lock.unlock();
+        }
+
+        if (shuffleError != null) {
+          // InputContext has already been informed of a fatal error. Relying on
+          // tez to kill the task.
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("NumCompletedInputs: " + numCompletedInputs);
+        }
+        if (numCompletedInputs.get() < numInputs && !isShutdown.get()) {
+          lock.lock();
+          try {
+            int maxFetchersToRun = numFetchers - runningFetchers.size();
+            int count = 0;
+            while (pendingHosts.peek() != null && !isShutdown.get()) {
+              InputHost inputHost = null;
+              try {
+                inputHost = pendingHosts.take();
+              } catch (InterruptedException e) {
+                if (isShutdown.get()) {
+                  LOG.info("Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+                  break;
+                } else {
+                  throw e;
+                }
+              }
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Processing pending host: " + inputHost.toDetailedString());
+              }
+              if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) {
+                LOG.info("Scheduling fetch for inputHost: " + inputHost.getIdentifier());
+                Fetcher fetcher = constructFetcherForHost(inputHost, conf);
+                runningFetchers.add(fetcher);
+                if (isShutdown.get()) {
+                  LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+                }
+                ListenableFuture<FetchResult> future = fetcherExecutor
+                    .submit(fetcher);
+                Futures.addCallback(future, new FetchFutureCallback(fetcher));
+                if (++count >= maxFetchersToRun) {
+                  break;
+                }
+              } else {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Skipping host: " + inputHost.getIdentifier()
+                      + " since it has no inputs to process");
+                }
+              }
+            }
+          } finally {
+            lock.unlock();
+          }
+        }
+      }
+      LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
+      // TODO NEWTEZ Maybe clean up inputs.
+      if (!fetcherExecutor.isShutdown()) {
+        fetcherExecutor.shutdownNow();
+      }
+      return null;
+    }
+  }
+  
+  private Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
+
+    Path lockDisk = null;
+
+    if (sharedFetchEnabled) {
+      // pick a single lock disk from the edge name's hashcode + host hashcode
+      final int h = Math.abs(Objects.hashCode(this.srcNameTrimmed, inputHost.getHost()));
+      lockDisk = new Path(this.localDisks[h % this.localDisks.length], "locks");
+    }
+
+    FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
+      httpConnectionParams, inputManager, inputContext.getApplicationId(),
+        shuffleSecret, srcNameTrimmed, conf, localFs, localDirAllocator,
+        lockDisk, localDiskFetchEnabled, sharedFetchEnabled);
+
+    if (codec != null) {
+      fetcherBuilder.setCompressionParameters(codec);
+    }
+    fetcherBuilder.setIFileParams(ifileReadAhead, ifileReadAheadLength);
+
+    // Remove obsolete inputs from the list being given to the fetcher. Also
+    // remove from the obsolete list.
+    List<InputAttemptIdentifier> pendingInputsForHost = inputHost
+        .clearAndGetPendingInputs();
+    for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
+        .iterator(); inputIter.hasNext();) {
+      InputAttemptIdentifier input = inputIter.next();
+      // Avoid adding attempts which have already completed.
+      if (completedInputSet.contains(input.getInputIdentifier())) {
+        inputIter.remove();
+        continue;
+      }
+      // Avoid adding attempts which have been marked as OBSOLETE 
+      if (obsoletedInputs.contains(input)) {
+        inputIter.remove();
+      }
+    }
+    // TODO NEWTEZ Maybe limit the number of inputs being given to a single
+    // fetcher, especially in the case where #hosts < #fetchers
+    fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
+        inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
+    LOG.info("Created Fetcher for host: " + inputHost.getHost()
+        + ", with inputs: " + pendingInputsForHost);
+    return fetcherBuilder.build();
+  }
+  
+  /////////////////// Methods for InputEventHandler
+  
+  public void addKnownInput(String hostName, int port,
+      InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) {
+    String identifier = InputHost.createIdentifier(hostName, port);
+    InputHost host = knownSrcHosts.get(identifier);
+    if (host == null) {
+      host = new InputHost(hostName, port, inputContext.getApplicationId(), srcPhysicalIndex);
+      assert identifier.equals(host.getIdentifier());
+      InputHost old = knownSrcHosts.putIfAbsent(identifier, host);
+      if (old != null) {
+        host = old;
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
+    }
+    host.addKnownInput(srcAttemptIdentifier);
+    lock.lock();
+    try {
+      boolean added = pendingHosts.offer(host);
+      if (!added) {
+        String errorMessage = "Unable to add host: " + host.getIdentifier() + " to pending queue";
+        LOG.error(errorMessage);
+        throw new TezUncheckedException(errorMessage);
+      }
+      wakeLoop.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void addCompletedInputWithNoData(
+      InputAttemptIdentifier srcAttemptIdentifier) {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
+    
+    if (!completedInputSet.contains(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.contains(inputIdentifier)) {
+          registerCompletedInput(new NullFetchedInput(srcAttemptIdentifier));
+        }
+      }
+    }
+
+    // Awake the loop to check for termination.
+    lock.lock();
+    try {
+      wakeLoop.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void addCompletedInputWithData(
+      InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput)
+      throws IOException {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+
+    LOG.info("Received Data via Event: " + srcAttemptIdentifier + " to "
+        + fetchedInput.getType());
+    // Count irrespective of whether this is a copy of an already fetched input
+    lock.lock();
+    try {
+      lastProgressTime = System.currentTimeMillis();
+    } finally {
+      lock.unlock();
+    }
+
+    boolean committed = false;
+    if (!completedInputSet.contains(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.contains(inputIdentifier)) {
+          fetchedInput.commit();
+          committed = true;
+          registerCompletedInput(fetchedInput);
+        }
+      }
+    }
+    if (!committed) {
+      fetchedInput.abort(); // If this fails, the fetcher may attempt another
+                            // abort.
+    } else {
+      lock.lock();
+      try {
+        // Signal the wakeLoop to check for termination.
+        wakeLoop.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
+    obsoletedInputs.add(srcAttemptIdentifier);
+    // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
+  }
+
+  /////////////////// End of Methods for InputEventHandler
+  /////////////////// Methods from FetcherCallbackHandler
+  
+  @Override
+  public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
+      FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
+      throws IOException {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+
+    // Count irrespective of whether this is a copy of an already fetched input
+    lock.lock();
+    try {
+      lastProgressTime = System.currentTimeMillis();
+    } finally {
+      lock.unlock();
+    }
+    
+    boolean committed = false;
+    if (!completedInputSet.contains(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.contains(inputIdentifier)) {
+          fetchedInput.commit();
+          committed = true;
+          logIndividualFetchComplete(copyDuration, fetchedBytes, decompressedLength, fetchedInput,
+              srcAttemptIdentifier);
+
+          // Processing counters for completed and commit fetches only. Need
+          // additional counters for excessive fetches - which primarily comes
+          // in after speculation or retries.
+          shuffledInputsCounter.increment(1);
+          bytesShuffledCounter.increment(fetchedBytes);
+          if (fetchedInput.getType() == Type.MEMORY) {
+            bytesShuffledToMemCounter.increment(fetchedBytes);
+          } else if (fetchedInput.getType() == Type.DISK) {
+            bytesShuffledToDiskCounter.increment(fetchedBytes);
+          } else if (fetchedInput.getType() == Type.DISK_DIRECT) {
+            bytesShuffledDirectDiskCounter.increment(fetchedBytes);
+          }
+          decompressedDataSizeCounter.increment(decompressedLength);
+
+          registerCompletedInput(fetchedInput);
+          lock.lock();
+          try {
+            totalBytesShuffledTillNow += fetchedBytes;
+          } finally {
+            lock.unlock();
+          }
+          logProgress();
+        }
+      }
+    }
+    if (!committed) {
+      fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
+    } else {
+      lock.lock();
+      try {
+        // Signal the wakeLoop to check for termination.
+        wakeLoop.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+    // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
+  }
+
+  @Override
+  public void fetchFailed(String host,
+      InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
+    // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
+    // For now, reporting immediately.
+    LOG.info("Fetch failed for src: " + srcAttemptIdentifier
+        + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
+        + connectFailed);
+    failedShufflesCounter.increment(1);
+    if (srcAttemptIdentifier == null) {
+      String message = "Received fetchFailure for an unknown src (null)";
+      LOG.fatal(message);
+      inputContext.fatalError(null, message);
+    } else {
+    InputReadErrorEvent readError = InputReadErrorEvent.create(
+        "Fetch failure while fetching from "
+            + TezRuntimeUtils.getTaskAttemptIdentifier(
+            inputContext.getSourceVertexName(),
+            srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+            srcAttemptIdentifier.getAttemptNumber()),
+        srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+        srcAttemptIdentifier.getAttemptNumber());
+    
+    List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+    failedEvents.add(readError);
+    inputContext.sendEvents(failedEvents);
+    }
+  }
+  /////////////////// End of Methods from FetcherCallbackHandler
+
+  public void shutdown() throws InterruptedException {
+    if (!isShutdown.getAndSet(true)) {
+      // Shut down any pending fetchers
+      LOG.info("Shutting down pending fetchers on source" + srcNameTrimmed + ": "
+          + runningFetchers.size());
+      lock.lock();
+      try {
+        wakeLoop.signal(); // signal the fetch-scheduler
+        for (Fetcher fetcher : runningFetchers) {
+          fetcher.shutdown(); // This could be parallelized.
+        }
+      } finally {
+        lock.unlock();
+      }
+
+      if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
+        this.schedulerExecutor.shutdownNow();
+      }
+      if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
+        this.fetcherExecutor.shutdownNow(); // Interrupts all running fetchers.
+      }
+    }
+    //All threads are shutdown.  It is safe to shutdown SSL factory
+    if (httpConnectionParams.isSSLShuffleEnabled()) {
+      HttpConnection.cleanupSSLFactory();
+    }
+  }
+
+  private void registerCompletedInput(FetchedInput fetchedInput) {
+    lock.lock();
+    try {
+      completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
+      completedInputs.add(fetchedInput);
+      if (!inputReadyNotificationSent.getAndSet(true)) {
+        // TODO Should eventually be controlled by Inputs which are processing the data.
+        inputContext.inputIsReady();
+      }
+      int numComplete = numCompletedInputs.incrementAndGet();
+      if (numComplete == numInputs) {
+        LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+  
+  /////////////////// Methods for walking the available inputs
+  
+  /**
+   * @return true if there is another input ready for consumption.
+   */
+  public boolean newInputAvailable() {
+    FetchedInput head = completedInputs.peek();
+    if (head == null || head instanceof NullFetchedInput) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * @return true if all of the required inputs have been fetched.
+   */
+  public boolean allInputsFetched() {
+    lock.lock();
+    try {
+      return numCompletedInputs.get() == numInputs;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * @return the next available input, or null if there are no available inputs.
+   *         This method will block if there are currently no available inputs,
+   *         but more may become available.
+   */
+  public FetchedInput getNextInput() throws InterruptedException {
+    FetchedInput input = null;
+    do {
+      // Check for no additional inputs
+      lock.lock();
+      try {
+        input = completedInputs.peek();
+        if (input == null && allInputsFetched()) {
+          break;
+        }
+      } finally {
+        lock.unlock();
+      }
+      input = completedInputs.take(); // block
+    } while (input instanceof NullFetchedInput);
+    return input;
+  }
+  /////////////////// End of methods for walking the available inputs
+
+
+  /**
+   * Fake input that is added to the completed input list in case an input does not have any data.
+   *
+   */
+  private class NullFetchedInput extends FetchedInput {
+
+    public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
+      super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void commit() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void abort() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void free() {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+  }
+
+  private void logProgress() {
+    double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
+    int inputsDone = numInputs - numCompletedInputs.get();
+    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+
+    double transferRate = mbs / secsSinceStart;
+    LOG.info("copy(" + inputsDone + " of " + numInputs +
+        ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
+        + mbpsFormat.format(transferRate) + " MB/s)");
+  }
+
+  private void logIndividualFetchComplete(long millis, long fetchedBytes, long decompressedLength,
+                                          FetchedInput fetchedInput,
+                                          InputAttemptIdentifier srcAttemptIdentifier) {
+    double rate = 0;
+    if (millis != 0) {
+      rate = fetchedBytes / ((double) millis / 1000);
+      rate = rate / (1024 * 1024);
+    }
+
+    LOG.info(
+        "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType() +
+            ", CompressedSize=" + fetchedBytes + ", DecompressedSize=" + decompressedLength +
+            ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
+            mbpsFormat.format(rate) + " MB/s");
+  }
+  
+  private class SchedulerFutureCallback implements FutureCallback<Void> {
+
+    @Override
+    public void onSuccess(Void result) {
+      LOG.info("Scheduler thread completed");
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      if (isShutdown.get()) {
+        LOG.info("Already shutdown. Ignoring error: " + t);
+      } else {
+        LOG.error("Scheduler failed with error: ", t);
+        inputContext.fatalError(t, "Shuffle Scheduler Failed");
+      }
+    }
+    
+  }
+  
+  private class FetchFutureCallback implements FutureCallback<FetchResult> {
+
+    private final Fetcher fetcher;
+    
+    public FetchFutureCallback(Fetcher fetcher) {
+      this.fetcher = fetcher;
+    }
+    
+    private void doBookKeepingForFetcherComplete() {
+      lock.lock();
+      try {
+        runningFetchers.remove(fetcher);
+        wakeLoop.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+    
+    @Override
+    public void onSuccess(FetchResult result) {
+      fetcher.shutdown();
+      if (isShutdown.get()) {
+        LOG.info("Already shutdown. Ignoring event from fetcher");
+      } else {
+        Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
+        if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
+          InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(), result.getPort()));
+          assert inputHost != null;
+          for (InputAttemptIdentifier input : pendingInputs) {
+            inputHost.addKnownInput(input);
+          }
+          pendingHosts.add(inputHost);
+        }
+        doBookKeepingForFetcherComplete();
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      // Unsuccessful - the fetcher may not have shutdown correctly. Try shutting it down.
+      fetcher.shutdown();
+      if (isShutdown.get()) {
+        LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
+      } else {
+        LOG.error("Fetcher failed with error: ", t);
+        shuffleError = t;
+        inputContext.fatalError(t, "Fetch failed");
+        doBookKeepingForFetcherComplete();
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
deleted file mode 100644
index 3a15b65..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ /dev/null
@@ -1,623 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
-import org.apache.tez.runtime.library.common.shuffle.impl.MapOutput.Type;
-
-import com.google.common.collect.Lists;
-
-class ShuffleScheduler {
-  static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
-    protected Long initialValue() {
-      return 0L;
-    }
-  };
-
-  private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
-  private static final long INITIAL_PENALTY = 2000l; // 2 seconds
-  private static final float PENALTY_GROWTH_RATE = 1.3f;
-  
-  // TODO NEWTEZ May need to be a string if attempting to fetch from multiple inputs.
-  private boolean[] finishedMaps;
-  private final int numInputs;
-  private int remainingMaps;
-  private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
-  //TODO NEWTEZ Clean this and other maps at some point
-  private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>(); 
-  private Set<MapHost> pendingHosts = new HashSet<MapHost>();
-  private Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>();
-  
-  private final Random random = new Random(System.currentTimeMillis());
-  private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
-  private final Referee referee;
-  private final Map<InputAttemptIdentifier, IntWritable> failureCounts =
-    new HashMap<InputAttemptIdentifier,IntWritable>(); 
-  private final Map<String,IntWritable> hostFailures = 
-    new HashMap<String,IntWritable>();
-  private final InputContext inputContext;
-  private final Shuffle shuffle;
-  private final TezCounter shuffledInputsCounter;
-  private final TezCounter skippedInputCounter;
-  private final TezCounter reduceShuffleBytes;
-  private final TezCounter reduceBytesDecompressed;
-  private final TezCounter failedShuffleCounter;
-  private final TezCounter bytesShuffledToDisk;
-  private final TezCounter bytesShuffledToDiskDirect;
-  private final TezCounter bytesShuffledToMem;
-
-  private final long startTime;
-  private long lastProgressTime;
-
-  private int maxTaskOutputAtOnce;
-  private int maxFetchFailuresBeforeReporting;
-  private boolean reportReadErrorImmediately = true; 
-  private int maxFailedUniqueFetches = 5;
-  private final int abortFailureLimit;
-  private int maxMapRuntime = 0;
-
-  private long totalBytesShuffledTillNow = 0;
-  private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
-  
-  public ShuffleScheduler(InputContext inputContext,
-                          Configuration conf,
-                          int numberOfInputs,
-                          Shuffle shuffle,
-                          TezCounter shuffledInputsCounter,
-                          TezCounter reduceShuffleBytes,
-                          TezCounter reduceBytesDecompressed,
-                          TezCounter failedShuffleCounter,
-                          TezCounter bytesShuffledToDisk,
-                          TezCounter bytesShuffledToDiskDirect,
-                          TezCounter bytesShuffledToMem) {
-    this.inputContext = inputContext;
-    this.numInputs = numberOfInputs;
-    abortFailureLimit = Math.max(30, numberOfInputs / 10);
-    remainingMaps = numberOfInputs;
-    finishedMaps = new boolean[remainingMaps]; // default init to false
-    this.referee = new Referee();
-    this.shuffle = shuffle;
-    this.shuffledInputsCounter = shuffledInputsCounter;
-    this.reduceShuffleBytes = reduceShuffleBytes;
-    this.reduceBytesDecompressed = reduceBytesDecompressed;
-    this.failedShuffleCounter = failedShuffleCounter;
-    this.bytesShuffledToDisk = bytesShuffledToDisk;
-    this.bytesShuffledToDiskDirect = bytesShuffledToDiskDirect;
-    this.bytesShuffledToMem = bytesShuffledToMem;
-    this.startTime = System.currentTimeMillis();
-    this.lastProgressTime = startTime;
-
-    this.maxFailedUniqueFetches = Math.min(numberOfInputs,
-        this.maxFailedUniqueFetches);
-    referee.start();
-    this.maxFetchFailuresBeforeReporting = 
-        conf.getInt(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT,
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT);
-    this.reportReadErrorImmediately = 
-        conf.getBoolean(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, 
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT);
-    this.maxTaskOutputAtOnce = Math.max(1, conf.getInt(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT));
-    
-    this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
-
-    LOG.info("ShuffleScheduler running for sourceVertex: "
-        + inputContext.getSourceVertexName() + " with configuration: "
-        + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
-        + ", reportReadErrorImmediately=" + reportReadErrorImmediately
-        + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
-        + ", abortFailureLimit=" + abortFailureLimit
-        + ", maxMapRuntime=" + maxMapRuntime);
-  }
-
-  public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier, 
-                                         MapHost host,
-                                         long bytesCompressed,
-                                         long bytesDecompressed,
-                                         long millis,
-                                         MapOutput output
-                                         ) throws IOException {
-
-    if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
-      if (output != null) {
-
-        failureCounts.remove(srcAttemptIdentifier);
-        if (host != null) {
-          hostFailures.remove(host.getHostIdentifier());
-        }
-
-        output.commit();
-        logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed, output,
-            srcAttemptIdentifier);
-        if (output.getType() == Type.DISK) {
-          bytesShuffledToDisk.increment(bytesCompressed);
-        } else if (output.getType() == Type.DISK_DIRECT) {
-          bytesShuffledToDiskDirect.increment(bytesCompressed);
-        } else {
-          bytesShuffledToMem.increment(bytesCompressed);
-        }
-        shuffledInputsCounter.increment(1);
-      } else {
-        // Output null implies that a physical input completion is being
-        // registered without needing to fetch data
-        skippedInputCounter.increment(1);
-      }
-      setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex());
-      
-      if (--remainingMaps == 0) {
-        LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName());
-        notifyAll();
-      }
-
-      // update the status
-      lastProgressTime = System.currentTimeMillis();
-      totalBytesShuffledTillNow += bytesCompressed;
-      logProgress();
-      reduceShuffleBytes.increment(bytesCompressed);
-      reduceBytesDecompressed.increment(bytesDecompressed);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("src task: "
-            + TezRuntimeUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
-                srcAttemptIdentifier.getAttemptNumber()) + " done");
-      }
-    } else {
-      // input is already finished. duplicate fetch.
-      LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + srcAttemptIdentifier);
-      // free the resource - specially memory
-      
-      // If the src does not generate data, output will be null.
-      if (output != null) {
-        output.abort();
-      }
-    }
-    // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
-  }
-
-  private void logIndividualFetchComplete(long millis, long bytesCompressed, long bytesDecompressed,
-                                          MapOutput output,
-                                          InputAttemptIdentifier srcAttemptIdentifier) {
-    double rate = 0;
-    if (millis != 0) {
-      rate = bytesCompressed / ((double) millis / 1000);
-      rate = rate / (1024 * 1024);
-    }
-    LOG.info(
-        "Completed fetch for attempt: " + srcAttemptIdentifier + " to " + output.getType() +
-            ", CompressedSize=" + bytesCompressed + ", DecompressedSize=" + bytesDecompressed +
-            ",EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
-            mbpsFormat.format(rate) + " MB/s");
-  }
-
-  private void logProgress() {
-    double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024);
-    int inputsDone = numInputs - remainingMaps;
-    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
-
-    double transferRate = mbs / secsSinceStart;
-    LOG.info("copy(" + inputsDone + " of " + numInputs +
-        ". Transfer rate (CumulativeDataFetched/TimeSinceInputStarted)) "
-        + mbpsFormat.format(transferRate) + " MB/s)");
-  }
-
-  public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
-                                      MapHost host,
-                                      boolean readError,
-                                      boolean connectError) {
-    host.penalize();
-    int failures = 1;
-    if (failureCounts.containsKey(srcAttempt)) {
-      IntWritable x = failureCounts.get(srcAttempt);
-      x.set(x.get() + 1);
-      failures = x.get();
-    } else {
-      failureCounts.put(srcAttempt, new IntWritable(1));      
-    }
-    String hostPort = host.getHostIdentifier();
-    // TODO TEZ-922 hostFailures isn't really used for anything. Factor it into error
-    // reporting / potential blacklisting of hosts.
-    if (hostFailures.containsKey(hostPort)) {
-      IntWritable x = hostFailures.get(hostPort);
-      x.set(x.get() + 1);
-    } else {
-      hostFailures.put(hostPort, new IntWritable(1));
-    }
-    if (failures >= abortFailureLimit) {
-      // This task has seen too many fetch failures - report it as failed. The
-      // AM may retry it if max failures has not been reached.
-      
-      // Between the task and the AM - someone needs to determine who is at
-      // fault. If there's enough errors seen on the task, before the AM informs
-      // it about source failure, the task considers itself to have failed and
-      // allows the AM to re-schedule it.
-      IOException ioe = new IOException(failures
-            + " failures downloading "
-            + TezRuntimeUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
-                srcAttempt.getAttemptNumber()));
-      ioe.fillInStackTrace();
-      shuffle.reportException(ioe);
-    }
-
-    failedShuffleCounter.increment(1);
-    checkAndInformAM(failures, srcAttempt, readError, connectError);
-
-    checkReducerHealth();
-    
-    long delay = (long) (INITIAL_PENALTY *
-        Math.pow(PENALTY_GROWTH_RATE, failures));
-    
-    penalties.add(new Penalty(host, delay));    
-  }
-
-  public void reportLocalError(IOException ioe) {
-    LOG.error("Shuffle failed : caused by local error", ioe);
-    shuffle.reportException(ioe);
-  }
-
-  // Notify the AM  
-  // after every read error, if 'reportReadErrorImmediately' is true or
-  // after every 'maxFetchFailuresBeforeReporting' failures
-  private void checkAndInformAM(
-      int failures, InputAttemptIdentifier srcAttempt, boolean readError,
-      boolean connectError) {
-    if ((reportReadErrorImmediately && (readError || connectError))
-        || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
-      LOG.info("Reporting fetch failure for InputIdentifier: " 
-          + srcAttempt + " taskAttemptIdentifier: "
-          + TezRuntimeUtils.getTaskAttemptIdentifier(
-              inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
-              srcAttempt.getAttemptNumber()) + " to AM.");
-      List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
-      failedEvents.add(InputReadErrorEvent.create("Fetch failure for "
-          + TezRuntimeUtils.getTaskAttemptIdentifier(
-          inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(),
-          srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier()
-          .getInputIndex(), srcAttempt.getAttemptNumber()));
-
-      inputContext.sendEvents(failedEvents);      
-    }
-  }
-
-  private void checkReducerHealth() {
-    final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
-    final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
-    final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
-
-    long totalFailures = failedShuffleCounter.getValue();
-    int doneMaps = numInputs - remainingMaps;
-    
-    boolean reducerHealthy =
-      (((float)totalFailures / (totalFailures + doneMaps))
-          < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
-    
-    // check if the reducer has progressed enough
-    boolean reducerProgressedEnough =
-      (((float)doneMaps / numInputs)
-          >= MIN_REQUIRED_PROGRESS_PERCENT);
-
-    // check if the reducer is stalled for a long time
-    // duration for which the reducer is stalled
-    int stallDuration =
-      (int)(System.currentTimeMillis() - lastProgressTime);
-    
-    // duration for which the reducer ran with progress
-    int shuffleProgressDuration =
-      (int)(lastProgressTime - startTime);
-
-    // min time the reducer should run without getting killed
-    int minShuffleRunDuration =
-      (shuffleProgressDuration > maxMapRuntime)
-      ? shuffleProgressDuration
-          : maxMapRuntime;
-    
-    boolean reducerStalled =
-      (((float)stallDuration / minShuffleRunDuration)
-          >= MAX_ALLOWED_STALL_TIME_PERCENT);
-
-    // kill if not healthy and has insufficient progress
-    if ((failureCounts.size() >= maxFailedUniqueFetches ||
-        failureCounts.size() == (numInputs - doneMaps))
-        && !reducerHealthy
-        && (!reducerProgressedEnough || reducerStalled)) {
-      LOG.fatal("Shuffle failed with too many fetch failures " + "and insufficient progress!"
-          + "failureCounts=" + failureCounts.size() + ", pendingInputs=" + (numInputs - doneMaps)
-          + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough="
-          + reducerProgressedEnough + ", reducerStalled=" + reducerStalled);
-      String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
-      shuffle.reportException(new IOException(errorMsg));
-    }
-
-  }
-  
-  public synchronized void addKnownMapOutput(String inputHostName,
-                                             int port,
-                                             int partitionId,
-                                             String hostUrl,
-                                             InputAttemptIdentifier srcAttempt) {
-    String hostPort = (inputHostName + ":" + String.valueOf(port));
-    String identifier = MapHost.createIdentifier(hostPort, partitionId);
-    MapHost host = mapLocations.get(identifier);
-    if (host == null) {
-      host = new MapHost(partitionId, hostPort, hostUrl);
-      assert identifier.equals(host.getIdentifier());
-      mapLocations.put(identifier, host);
-    }
-    host.addKnownMap(srcAttempt);
-    pathToIdentifierMap.put(
-        getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), partitionId), srcAttempt);
-
-    // Mark the host as pending
-    if (host.getState() == MapHost.State.PENDING) {
-      pendingHosts.add(host);
-      notifyAll();
-    }
-  }
-  
-  public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) {
-    // The incoming srcAttempt does not contain a path component.
-    LOG.info("Adding obsolete input: " + srcAttempt);
-    obsoleteInputs.add(srcAttempt);
-  }
-  
-  public synchronized void putBackKnownMapOutput(MapHost host,
-                                                 InputAttemptIdentifier srcAttempt) {
-    host.addKnownMap(srcAttempt);
-  }
-
-  public synchronized MapHost getHost() throws InterruptedException {
-      while(pendingHosts.isEmpty()) {
-        wait();
-      }
-      
-      MapHost host = null;
-      Iterator<MapHost> iter = pendingHosts.iterator();
-      int numToPick = random.nextInt(pendingHosts.size());
-      for (int i=0; i <= numToPick; ++i) {
-        host = iter.next();
-      }
-      
-      pendingHosts.remove(host);     
-      host.markBusy();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Assigning " + host + " with " + host.getNumKnownMapOutputs() +
-            " to " + Thread.currentThread().getName());
-      }
-      shuffleStart.set(System.currentTimeMillis());
-      
-      return host;
-  }
-  
-  public InputAttemptIdentifier getIdentifierForFetchedOutput(
-      String path, int reduceId) {
-    return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId));
-  }
-  
-  private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
-    return (!obsoleteInputs.contains(id) && 
-             !isInputFinished(id.getInputIdentifier().getInputIndex()));
-  }
-  
-  public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
-    List<InputAttemptIdentifier> origList = host.getAndClearKnownMaps();
-
-    Map<Integer, InputAttemptIdentifier> dedupedList = new LinkedHashMap<Integer, InputAttemptIdentifier>();
-    Iterator<InputAttemptIdentifier> listItr = origList.iterator();
-    while (listItr.hasNext()) {
-      // we may want to try all versions of the input but with current retry
-      // behavior older ones are likely to be lost and should be ignored.
-      // This may be removed after TEZ-914
-      InputAttemptIdentifier id = listItr.next();
-      if (inputShouldBeConsumed(id)) {
-        Integer inputNumber = new Integer(id.getInputIdentifier().getInputIndex());
-        InputAttemptIdentifier oldId = dedupedList.get(inputNumber);
-        if (oldId == null || oldId.getAttemptNumber() < id.getAttemptNumber()) {
-          dedupedList.put(inputNumber, id);
-          if (oldId != null) {
-            LOG.warn("Old Src for InputIndex: " + inputNumber + " with attemptNumber: "
-                + oldId.getAttemptNumber()
-                + " was not determined to be invalid. Ignoring it for now in favour of "
-                + id.getAttemptNumber());
-          }
-        }
-      } else {
-        LOG.info("Ignoring finished or obsolete source: " + id);
-      }
-    }
-    
-    // Compute the final list, limited by NUM_FETCHERS_AT_ONCE
-    List<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
-    int includedMaps = 0;
-    int totalSize = dedupedList.size();
-    Iterator<Map.Entry<Integer, InputAttemptIdentifier>> dedupedItr = dedupedList.entrySet().iterator();
-    // find the maps that we still need, up to the limit
-    while (dedupedItr.hasNext()) {
-      InputAttemptIdentifier id = dedupedItr.next().getValue();
-      result.add(id);
-      if (++includedMaps >= maxTaskOutputAtOnce) {
-        break;
-      }
-    }
-
-    // put back the maps left after the limit
-    while (dedupedItr.hasNext()) {
-      InputAttemptIdentifier id = dedupedItr.next().getValue();
-      host.addKnownMap(id);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("assigned " + includedMaps + " of " + totalSize + " to " +
-          host + " to " + Thread.currentThread().getName());
-    }
-    return result;
-  }
-
-  public synchronized void freeHost(MapHost host) {
-    if (host.getState() != MapHost.State.PENALIZED) {
-      if (host.markAvailable() == MapHost.State.PENDING) {
-        pendingHosts.add(host);
-        notifyAll();
-      }
-    }
-    LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + 
-             (System.currentTimeMillis()-shuffleStart.get()) + "ms");
-  }
-
-  public synchronized void resetKnownMaps() {
-    mapLocations.clear();
-    obsoleteInputs.clear();
-    pendingHosts.clear();
-    pathToIdentifierMap.clear();
-  }
-
-  /**
-   * Utility method to check if the Shuffle data fetch is complete.
-   * @return
-   */
-  public synchronized boolean isDone() {
-    return remainingMaps == 0;
-  }
-
-  /**
-   * Wait until the shuffle finishes or until the timeout.
-   * @param millis maximum wait time
-   * @return true if the shuffle is done
-   * @throws InterruptedException
-   */
-  public synchronized boolean waitUntilDone(int millis
-                                            ) throws InterruptedException {
-    if (remainingMaps > 0) {
-      wait(millis);
-      return remainingMaps == 0;
-    }
-    return true;
-  }
-  
-  /**
-   * A structure that records the penalty for a host.
-   */
-  private static class Penalty implements Delayed {
-    MapHost host;
-    private long endTime;
-    
-    Penalty(MapHost host, long delay) {
-      this.host = host;
-      this.endTime = System.currentTimeMillis() + delay;
-    }
-
-    public long getDelay(TimeUnit unit) {
-      long remainingTime = endTime - System.currentTimeMillis();
-      return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
-    }
-
-    public int compareTo(Delayed o) {
-      long other = ((Penalty) o).endTime;
-      return endTime == other ? 0 : (endTime < other ? -1 : 1);
-    }
-    
-  }
-  
-  private String getIdentifierFromPathAndReduceId(String path, int reduceId) {
-    return path + "_" + reduceId;
-  }
-  
-  /**
-   * A thread that takes hosts off of the penalty list when the timer expires.
-   */
-  private class Referee extends Thread {
-    public Referee() {
-      setName("ShufflePenaltyReferee ["
-          + TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + "]");
-      setDaemon(true);
-    }
-
-    public void run() {
-      try {
-        while (true) {
-          // take the first host that has an expired penalty
-          MapHost host = penalties.take().host;
-          synchronized (ShuffleScheduler.this) {
-            if (host.markAvailable() == MapHost.State.PENDING) {
-              pendingHosts.add(host);
-              ShuffleScheduler.this.notifyAll();
-            }
-          }
-        }
-      } catch (InterruptedException ie) {
-        return;
-      } catch (Throwable t) {
-        shuffle.reportException(t);
-      }
-    }
-  }
-  
-  public void close() throws InterruptedException {
-    /// ZZZ need to interrupt setlf ?
-    referee.interrupt();
-    referee.join();
-  }
-
-  public synchronized void informMaxMapRunTime(int duration) {
-    if (duration > maxMapRuntime) {
-      maxMapRuntime = duration;
-    }
-  }
-  
-  void setInputFinished(int inputIndex) {
-    synchronized(finishedMaps) {
-      finishedMaps[inputIndex] = true;
-    }
-  }
-  
-  boolean isInputFinished(int inputIndex) {
-    synchronized (finishedMaps) {
-      return finishedMaps[inputIndex];      
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
new file mode 100644
index 0000000..7e568f9
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback;
+import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
+
+
+/**
+ * Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
+ *
+ */
+@Private
+public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
+    FetchedInputCallback {
+
+  private static final Log LOG = LogFactory.getLog(SimpleFetchedInputAllocator.class);
+  
+  private final Configuration conf;
+
+  private final TezTaskOutputFiles fileNameAllocator;
+  private final LocalDirAllocator localDirAllocator;
+
+  // Configuration parameters
+  private final long memoryLimit;
+  private final long maxSingleShuffleLimit;
+
+  private final long maxAvailableTaskMemory;
+  private final long initialMemoryAvailable;
+  
+  private volatile long usedMemory = 0;
+
+  public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf,
+      long maxTaskAvailableMemory, long memoryAvailable) {
+    this.conf = conf;    
+    this.maxAvailableTaskMemory = maxTaskAvailableMemory;
+    this.initialMemoryAvailable = memoryAvailable;
+    
+    this.fileNameAllocator = new TezTaskOutputFiles(conf,
+        uniqueIdentifier);
+    this.localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    
+    // Setup configuration
+    final float maxInMemCopyUse = conf.getFloat(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": "
+          + maxInMemCopyUse);
+    }
+    
+    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+    
+    if (memReq <= this.initialMemoryAvailable) {
+      this.memoryLimit = memReq;
+    } else {
+      this.memoryLimit = initialMemoryAvailable;
+    }
+
+    LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
+    
+    final float singleShuffleMemoryLimitPercent = conf.getFloat(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
+    if (singleShuffleMemoryLimitPercent <= 0.0f
+        || singleShuffleMemoryLimitPercent > 1.0f) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+          + singleShuffleMemoryLimitPercent);
+    }
+
+    this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
+    
+    LOG.info("SimpleInputManager -> " + "MemoryLimit: " + 
+        this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
+  }
+
+  @Private
+  public static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
+    final float maxInMemCopyUse = conf.getFloat(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": "
+          + maxInMemCopyUse);
+    }
+    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+    return memReq;
+  }
+
+  @Override
+  public synchronized FetchedInput allocate(long actualSize, long compressedSize,
+      InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
+    if (actualSize > maxSingleShuffleLimit
+        || this.usedMemory + actualSize > this.memoryLimit) {
+      return new DiskFetchedInput(actualSize, compressedSize,
+          inputAttemptIdentifier, this, conf, localDirAllocator,
+          fileNameAllocator);
+    } else {
+      this.usedMemory += actualSize;
+      LOG.info("Used memory after allocating " + actualSize  + " : " + usedMemory);
+      return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this);
+    }
+  }
+
+  @Override
+  public synchronized FetchedInput allocateType(Type type, long actualSize,
+      long compressedSize, InputAttemptIdentifier inputAttemptIdentifier)
+      throws IOException {
+
+    switch (type) {
+    case DISK:
+      return new DiskFetchedInput(actualSize, compressedSize,
+          inputAttemptIdentifier, this, conf, localDirAllocator,
+          fileNameAllocator);
+    default:
+      return allocate(actualSize, compressedSize, inputAttemptIdentifier);
+    }
+  }
+
+  @Override
+  public synchronized void fetchComplete(FetchedInput fetchedInput) {
+    switch (fetchedInput.getType()) {
+    // Not tracking anything here.
+    case DISK:
+    case DISK_DIRECT:
+    case MEMORY:
+      break;
+    default:
+      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+          + " not expected for Broadcast fetch");
+    }
+  }
+
+  @Override
+  public synchronized void fetchFailed(FetchedInput fetchedInput) {
+    cleanup(fetchedInput);
+  }
+
+  @Override
+  public synchronized void freeResources(FetchedInput fetchedInput) {
+    cleanup(fetchedInput);
+  }
+
+  private void cleanup(FetchedInput fetchedInput) {
+    switch (fetchedInput.getType()) {
+    case DISK:
+      break;
+    case MEMORY:
+      unreserve(fetchedInput.getActualSize());
+      break;
+    default:
+      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+          + " not expected for Broadcast fetch");
+    }
+  }
+
+  private synchronized void unreserve(long size) {
+    this.usedMemory -= size;
+    LOG.info("Used memory after freeing " + size  + " : " + usedMemory);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java
new file mode 100644
index 0000000..8739dd2
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ExceptionReporter.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+/**
+ * An interface for reporting exceptions to other threads
+ */
+interface ExceptionReporter {
+  void reportException(Throwable t);
+}


Mime
View raw message