tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/2] TEZ-911. Re-factor BroadcastShuffle related code to be independent of Braodcast. (sseth)
Date Thu, 06 Mar 2014 22:29:15 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 351a61058 -> 0df108154


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
new file mode 100644
index 0000000..6b91558
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -0,0 +1,706 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputIdentifier;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.shuffle.common.FetchResult;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
+import org.apache.tez.runtime.library.shuffle.common.Fetcher;
+import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
+import org.apache.tez.runtime.library.shuffle.common.InputHost;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
+import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class ShuffleManager implements FetcherCallback {
+
+  private static final Log LOG = LogFactory.getLog(ShuffleManager.class);
+  
+  private final TezInputContext inputContext;
+  private final Configuration conf;
+  private final int numInputs;
+
+  private ShuffleEventHandler inputEventHandler;
+  private FetchedInputAllocator inputManager;
+  
+  private ExecutorService fetcherRawExecutor;
+  private ListeningExecutorService fetcherExecutor;
+
+  private ExecutorService schedulerRawExecutor;
+  private ListeningExecutorService schedulerExecutor;
+  private RunShuffleCallable schedulerCallable = new RunShuffleCallable();
+  
+  private BlockingQueue<FetchedInput> completedInputs;
+  private AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
+  private Set<InputIdentifier> completedInputSet;
+  private ConcurrentMap<String, InputHost> knownSrcHosts;
+  private BlockingQueue<InputHost> pendingHosts;
+  private Set<InputAttemptIdentifier> obsoletedInputs;
+  
+  private AtomicInteger numCompletedInputs = new AtomicInteger(0);
+  
+  private long startTime;
+  private long lastProgressTime;
+
+  // Required to be held when manipulating pendingHosts
+  private ReentrantLock lock = new ReentrantLock();
+  private Condition wakeLoop = lock.newCondition();
+  
+  private int numFetchers;
+  private AtomicInteger numRunningFetchers = new AtomicInteger(0);
+  
+  // Parameters required by Fetchers
+  private SecretKey shuffleSecret;
+  private int connectionTimeout;
+  private int readTimeout;
+  private CompressionCodec codec;
+  
+  private int ifileBufferSize;
+  private boolean ifileReadAhead;
+  private int ifileReadAheadLength;
+  
+  private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
+  
+  private volatile Throwable shuffleError;
+  
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+  private final TezCounter shuffledInputsCounter;
+  private final TezCounter failedShufflesCounter;
+  private final TezCounter bytesShuffledCounter;
+  private final TezCounter decompressedDataSizeCounter;
+  private final TezCounter bytesShuffledToDiskCounter;
+  private final TezCounter bytesShuffledToMemCounter;
+  
+  // TODO More counters - FetchErrors, speed?
+  
+  public ShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs)
throws IOException {
+    this.inputContext = inputContext;
+    this.conf = conf;
+    this.numInputs = numInputs;
+    
+    this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
+    this.failedShufflesCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
+    this.bytesShuffledCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
+    this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
+    this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
+    this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
+  }
+  
+  public void setIfileParameters(int bufferSize, boolean readAhead, int readAheadLength)
{
+    this.ifileBufferSize = bufferSize;
+    this.ifileReadAhead = readAhead;
+    this.ifileReadAheadLength = readAheadLength;
+  }
+  
+  public void setCompressionCodec(CompressionCodec codec) {
+    this.codec = codec;
+  }
+  
+  public void setInputEventHandler(ShuffleEventHandler eventHandler) {
+    this.inputEventHandler = eventHandler;
+  }
+  
+  public void setFetchedInputAllocator(FetchedInputAllocator allocator) {
+    this.inputManager = allocator;
+  }
+
+
+  private void configureAndStart() throws IOException {
+    completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier,
Boolean>(numInputs));
+    completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
+    knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
+    pendingHosts = new LinkedBlockingQueue<InputHost>();
+    obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier,
Boolean>());
+    
+    int maxConfiguredFetchers = 
+        conf.getInt(
+            TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
+            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+    
+    this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
+    
+    this.fetcherRawExecutor = Executors.newFixedThreadPool(
+        numFetchers,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat(
+                "Fetcher [" + inputContext.getUniqueIdentifier() + "] #%d")
+            .build());
+    this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
+    
+    this.schedulerRawExecutor = Executors.newFixedThreadPool(
+        1,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat(
+                "ShuffleRunner [" + inputContext.getUniqueIdentifier() + "]")
+            .build());
+    this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
+    
+    this.startTime = System.currentTimeMillis();
+    this.lastProgressTime = startTime;
+    
+    this.shuffleSecret = ShuffleUtils
+        .getJobTokenSecretFromTokenBytes(inputContext
+            .getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
+    
+    this.connectionTimeout = conf.getInt(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
+    this.readTimeout = conf.getInt(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+    
+    LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
+        + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
+        + numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
+        + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength);
+  }
+
+  public void run() throws IOException {
+    Preconditions.checkState(inputManager != null, "InputManager must be configured");
+    Preconditions.checkState(inputEventHandler != null, "InputEventHandler must be configured");
+    
+    configureAndStart();
+    ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
+    Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
+    // Shutdown this executor once this task, and the callback complete.
+    schedulerExecutor.shutdown();
+  }
+  
+  private class RunShuffleCallable implements Callable<Void> {
+
+    @Override
+    public Void call() throws Exception {
+      while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
+        lock.lock();
+        try {
+          if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
+            if (numCompletedInputs.get() < numInputs) {
+              wakeLoop.await();
+            }
+          }
+        } finally {
+          lock.unlock();
+        }
+
+        if (shuffleError != null) {
+          // InputContext has already been informed of a fatal error. Relying on
+          // tez to kill the task.
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("NumCompletedInputs: " + numCompletedInputs);
+        }
+        if (numCompletedInputs.get() < numInputs) {
+          lock.lock();
+          try {
+            int maxFetchersToRun = numFetchers - numRunningFetchers.get();
+            int count = 0;
+            while (pendingHosts.peek() != null) {
+              InputHost inputHost = null;
+              try {
+                inputHost = pendingHosts.take();
+              } catch (InterruptedException e) {
+                if (isShutdown.get()) {
+                  LOG.info("Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler
Loop");
+                  break;
+                } else {
+                  throw e;
+                }
+              }
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Processing pending host: " + inputHost.toDetailedString());
+              }
+              if (inputHost.getNumPendingInputs() > 0) {
+                LOG.info("Scheduling fetch for inputHost: " + inputHost.getHost());
+                Fetcher fetcher = constructFetcherForHost(inputHost);
+                numRunningFetchers.incrementAndGet();
+                if (isShutdown.get()) {
+                  LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
+                }
+                ListenableFuture<FetchResult> future = fetcherExecutor
+                    .submit(fetcher);
+                Futures.addCallback(future, fetchFutureCallback);
+                if (++count >= maxFetchersToRun) {
+                  break;
+                }
+              } else {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Skipping host: " + inputHost.getHost()
+                      + " since it has no inputs to process");
+                }
+              }
+            }
+          } finally {
+            lock.unlock();
+          }
+        }
+      }
+      LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
+      // TODO NEWTEZ Maybe clean up inputs.
+      if (!fetcherExecutor.isShutdown()) {
+        fetcherExecutor.shutdownNow();
+      }
+      return null;
+    }
+  }
+  
+  private Fetcher constructFetcherForHost(InputHost inputHost) {
+    FetcherBuilder fetcherBuilder = new FetcherBuilder(
+        ShuffleManager.this, inputManager,
+        inputContext.getApplicationId(), shuffleSecret, conf);
+    fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
+    if (codec != null) {
+      fetcherBuilder.setCompressionParameters(codec);
+    }
+    fetcherBuilder.setIFileParams(ifileReadAhead, ifileReadAheadLength);
+
+    // Remove obsolete inputs from the list being given to the fetcher. Also
+    // remove from the obsolete list.
+    List<InputAttemptIdentifier> pendingInputsForHost = inputHost
+        .clearAndGetPendingInputs();
+    for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
+        .iterator(); inputIter.hasNext();) {
+      InputAttemptIdentifier input = inputIter.next();
+      // Avoid adding attempts which have already completed.
+      if (completedInputSet.contains(input.getInputIdentifier())) {
+        inputIter.remove();
+      }
+      // Avoid adding attempts which have been marked as OBSOLETE 
+      if (obsoletedInputs.contains(input)) {
+        inputIter.remove();
+        obsoletedInputs.remove(input);
+      }
+    }
+    // TODO NEWTEZ Maybe limit the number of inputs being given to a single
+    // fetcher, especially in the case where #hosts < #fetchers
+    fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
+        pendingInputsForHost);
+    LOG.info("Created Fetcher for host: " + inputHost.getHost()
+        + ", with inputs: " + pendingInputsForHost);
+    return fetcherBuilder.build();
+  }
+  
+  /////////////////// Methods for InputEventHandler
+  
+  public void addKnownInput(String hostName, int port,
+      InputAttemptIdentifier srcAttemptIdentifier, int partition) {
+    InputHost host = knownSrcHosts.get(hostName);
+    if (host == null) {
+      host = new InputHost(hostName, port, inputContext.getApplicationId());
+      InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
+      if (old != null) {
+        host = old;
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
+    }
+    host.addKnownInput(srcAttemptIdentifier);
+    lock.lock();
+    try {
+      boolean added = pendingHosts.offer(host);
+      if (!added) {
+        String errorMessage = "Unable to add host: " + host.getHost() + " to pending queue";
+        LOG.error(errorMessage);
+        throw new TezUncheckedException(errorMessage);
+      }
+      wakeLoop.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void addCompletedInputWithNoData(
+      InputAttemptIdentifier srcAttemptIdentifier) {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
+    
+    if (!completedInputSet.contains(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.contains(inputIdentifier)) {
+          registerCompletedInput(new NullFetchedInput(srcAttemptIdentifier));
+        }
+      }
+    }
+
+    // Awake the loop to check for termination.
+    lock.lock();
+    try {
+      wakeLoop.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void addCompletedInputWithData(
+      InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput)
+      throws IOException {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+
+    LOG.info("Received Data via Event: " + srcAttemptIdentifier + " to "
+        + fetchedInput.getType());
+    // Count irrespective of whether this is a copy of an already fetched input
+    lock.lock();
+    try {
+      lastProgressTime = System.currentTimeMillis();
+    } finally {
+      lock.unlock();
+    }
+
+    boolean committed = false;
+    if (!completedInputSet.contains(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.contains(inputIdentifier)) {
+          fetchedInput.commit();
+          committed = true;
+          registerCompletedInput(fetchedInput);
+        }
+      }
+    }
+    if (!committed) {
+      fetchedInput.abort(); // If this fails, the fetcher may attempt another
+                            // abort.
+    } else {
+      lock.lock();
+      try {
+        // Signal the wakeLoop to check for termination.
+        wakeLoop.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier)
{
+    obsoletedInputs.add(srcAttemptIdentifier);
+    // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the
initial fetch list construction.
+  }
+  
+  
+  public void handleEvents(List<Event> events) throws IOException {
+    inputEventHandler.handleEvents(events);
+  }
+
+  /////////////////// End of Methods for InputEventHandler
+  /////////////////// Methods from FetcherCallbackHandler
+  
+  @Override
+  public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier,
+      FetchedInput fetchedInput, long fetchedBytes, long decompressedLength, long copyDuration)
+      throws IOException {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();    
+
+    LOG.info("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
+
+    // Count irrespective of whether this is a copy of an already fetched input
+    lock.lock();
+    try {
+      lastProgressTime = System.currentTimeMillis();
+    } finally {
+      lock.unlock();
+    }
+    
+    boolean committed = false;
+    if (!completedInputSet.contains(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.contains(inputIdentifier)) {
+          fetchedInput.commit();
+          committed = true;
+          
+          // Processing counters for completed and commit fetches only. Need
+          // additional counters for excessive fetches - which primarily comes
+          // in after speculation or retries.
+          shuffledInputsCounter.increment(1);
+          bytesShuffledCounter.increment(fetchedBytes);
+          if (fetchedInput.getType() == Type.MEMORY) {
+            bytesShuffledToMemCounter.increment(fetchedBytes);
+          } else {
+            bytesShuffledToDiskCounter.increment(fetchedBytes);
+          }
+          decompressedDataSizeCounter.increment(decompressedLength);
+
+          registerCompletedInput(fetchedInput);
+        }
+      }
+    }
+    if (!committed) {
+      fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
+    } else {
+      lock.lock();
+      try {
+        // Signal the wakeLoop to check for termination.
+        wakeLoop.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+    // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same
task in their queue.
+  }
+
+  @Override
+  public void fetchFailed(String host,
+      InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
+    // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
+    // For now, reporting immediately.
+    LOG.info("Fetch failed for src: " + srcAttemptIdentifier
+        + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
+        + connectFailed);
+    failedShufflesCounter.increment(1);
+    if (srcAttemptIdentifier == null) {
+      String message = "Received fetchFailure for an unknown src (null)";
+      LOG.fatal(message);
+      inputContext.fatalError(null, message);
+    } else {
+    InputReadErrorEvent readError = new InputReadErrorEvent(
+        "Fetch failure while fetching from "
+            + TezRuntimeUtils.getTaskAttemptIdentifier(
+                inputContext.getSourceVertexName(),
+                srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+                srcAttemptIdentifier.getAttemptNumber()),
+        srcAttemptIdentifier.getInputIdentifier().getInputIndex(),
+        srcAttemptIdentifier.getAttemptNumber());
+    
+    List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+    failedEvents.add(readError);
+    inputContext.sendEvents(failedEvents);
+    }
+  }
+  /////////////////// End of Methods from FetcherCallbackHandler
+
+  public void shutdown() throws InterruptedException {
+    isShutdown.set(true);
+    if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
+      this.schedulerExecutor.shutdownNow(); // Interrupt all running fetchers
+    }
+    if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
+      this.fetcherExecutor.shutdownNow(); // Interrupt all running fetchers
+    }
+  }
+  
+  private void registerCompletedInput(FetchedInput fetchedInput) {
+    lock.lock();
+    try {
+      completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
+      completedInputs.add(fetchedInput);
+      if (!inputReadyNotificationSent.getAndSet(true)) {
+        // TODO Should eventually be controlled by Inputs which are processing the data.
+        inputContext.inputIsReady();
+      }
+      numCompletedInputs.incrementAndGet();
+    } finally {
+      lock.unlock();
+    }
+  }
+  
+  /////////////////// Methods for walking the available inputs
+  
+  /**
+   * @return true if there is another input ready for consumption.
+   */
+  public boolean newInputAvailable() {
+    FetchedInput head = completedInputs.peek();
+    if (head == null || head instanceof NullFetchedInput) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * @return true if all of the required inputs have been fetched.
+   */
+  public boolean allInputsFetched() {
+    lock.lock();
+    try {
+      return numCompletedInputs.get() == numInputs;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * @return the next available input, or null if there are no available inputs.
+   *         This method will block if there are currently no available inputs,
+   *         but more may become available.
+   */
+  public FetchedInput getNextInput() throws InterruptedException {
+    FetchedInput input = null;
+    do {
+      // Check for no additional inputs
+      lock.lock();
+      try {
+        input = completedInputs.peek();
+        if (input == null && allInputsFetched()) {
+          break;
+        }
+      } finally {
+        lock.unlock();
+      }
+      input = completedInputs.take(); // block
+    } while (input instanceof NullFetchedInput);
+    return input;
+  }
+  /////////////////// End of methods for walking the available inputs
+
+
+  /**
+   * Fake input that is added to the completed input list in case an input does not have
any data.
+   *
+   */
+  private class NullFetchedInput extends FetchedInput {
+
+    public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
+      super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void commit() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void abort() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void free() {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+  }
+  
+  
+  private class SchedulerFutureCallback implements FutureCallback<Void> {
+
+    @Override
+    public void onSuccess(Void result) {
+      LOG.info("Scheduler thread completed");
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      if (isShutdown.get()) {
+        LOG.info("Already shutdown. Ignoring error: " + t);
+      } else {
+        LOG.error("Scheduler failed with error: ", t);
+        inputContext.fatalError(t, "Shuffle Scheduler Failed");
+      }
+    }
+    
+  }
+  
+  private class FetchFutureCallback implements FutureCallback<FetchResult> {
+
+    private void doBookKeepingForFetcherComplete() {
+      numRunningFetchers.decrementAndGet();
+      lock.lock();
+      try {
+        wakeLoop.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+    
+    @Override
+    public void onSuccess(FetchResult result) {
+      Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
+      if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
+        InputHost inputHost = knownSrcHosts.get(result.getHost());
+        assert inputHost != null;
+        for (InputAttemptIdentifier input : pendingInputs) {
+          inputHost.addKnownInput(input);
+        }
+        pendingHosts.add(inputHost);
+      }
+      doBookKeepingForFetcherComplete();
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      if (isShutdown.get()) {
+        LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
+      } else {
+        LOG.error("Fetcher failed with error: ", t);
+        shuffleError = t;
+        inputContext.fatalError(t, "Fetch failed");
+        doBookKeepingForFetcherComplete();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
new file mode 100644
index 0000000..d8682f3
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
+ *
+ */
+@Private
+public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
+    FetchedInputCallback {
+
+  private static final Log LOG = LogFactory.getLog(SimpleFetchedInputAllocator.class);
+  
+  private final Configuration conf;
+  private final String uniqueIdentifier;
+
+  private TezTaskOutputFiles fileNameAllocator;
+  private LocalDirAllocator localDirAllocator;
+
+  // Configuration parameters
+  private long memoryLimit;
+  private long maxSingleShuffleLimit;
+
+  private volatile long usedMemory = 0;
+  
+  private long maxAvailableTaskMemory;
+  private long initialMemoryAvailable =-1l;
+
+  public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf, long maxTaskAvailableMemory)
{
+    this.conf = conf;    
+    this.uniqueIdentifier = uniqueIdentifier;
+    this.maxAvailableTaskMemory = maxTaskAvailableMemory;
+  }
+
+  @Private
+  public void configureAndStart() {
+    Preconditions.checkState(initialMemoryAvailable != -1,
+        "Initial memory must be configured before starting");
+    this.fileNameAllocator = new TezTaskOutputFiles(conf,
+        uniqueIdentifier);
+    this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+    // Setup configuration
+    final float maxInMemCopyUse = conf.getFloat(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+          + maxInMemCopyUse);
+    }
+
+    // Allow unit tests to fix Runtime memory
+    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+    
+    if (memReq <= this.initialMemoryAvailable) {
+      this.memoryLimit = memReq;
+    } else {
+      this.memoryLimit = initialMemoryAvailable;
+    }
+
+    LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
+
+    final float singleShuffleMemoryLimitPercent = conf.getFloat(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    if (singleShuffleMemoryLimitPercent <= 0.0f
+        || singleShuffleMemoryLimitPercent > 1.0f) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+          + singleShuffleMemoryLimitPercent);
+    }
+
+    this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
+    
+    LOG.info("BroadcastInputManager -> " + "MemoryLimit: " + 
+    this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
+  }
+  
+  @Private
+  public static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory)
{
+    final float maxInMemCopyUse = conf.getFloat(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+          + maxInMemCopyUse);
+    }
+    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+    return memReq;
+  }
+
+  @Private
+  public void setInitialMemoryAvailable(long available) {
+    this.initialMemoryAvailable = available;
+  }
+
+  @Override
+  public synchronized FetchedInput allocate(long actualSize, long compressedSize,
+      InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
+    if (actualSize > maxSingleShuffleLimit
+        || this.usedMemory + actualSize > this.memoryLimit) {
+      return new DiskFetchedInput(actualSize, compressedSize,
+          inputAttemptIdentifier, this, conf, localDirAllocator,
+          fileNameAllocator);
+    } else {
+      this.usedMemory += actualSize;
+      LOG.info("Used memory after allocating " + actualSize  + " : " + usedMemory);
+      return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this);
+    }
+  }
+
+  @Override
+  public synchronized void fetchComplete(FetchedInput fetchedInput) {
+    switch (fetchedInput.getType()) {
+    // Not tracking anything here.
+    case DISK:
+    case MEMORY:
+      break;
+    default:
+      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+          + " not expected for Broadcast fetch");
+    }
+  }
+
+  @Override
+  public synchronized void fetchFailed(FetchedInput fetchedInput) {
+    cleanup(fetchedInput);
+  }
+
+  @Override
+  public synchronized void freeResources(FetchedInput fetchedInput) {
+    cleanup(fetchedInput);
+  }
+
+  private void cleanup(FetchedInput fetchedInput) {
+    switch (fetchedInput.getType()) {
+    case DISK:
+      break;
+    case MEMORY:
+      unreserve(fetchedInput.getActualSize());
+      break;
+    default:
+      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+          + " not expected for Broadcast fetch");
+    }
+  }
+
+  private synchronized void unreserve(long size) {
+    this.usedMemory -= size;
+    LOG.info("Used memory after freeing " + size  + " : " + usedMemory);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
deleted file mode 100644
index f07c5ac..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.broadcast.input;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.junit.Test;
-
-public class TestBroadcastInputManager {
-
-  private static final Log LOG = LogFactory.getLog(TestBroadcastInputManager.class);
-  
-  @Test
-  public void testInMemAllocation() throws IOException {
-    String localDirs = "/tmp/" + this.getClass().getName();
-    Configuration conf = new Configuration();
-    
-    long jvmMax = Runtime.getRuntime().maxMemory();
-    LOG.info("jvmMax: " + jvmMax);
-    
-    float bufferPercent = 0.1f;
-    conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, bufferPercent);
-    conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 1.0f);
-    conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
-    
-    long inMemThreshold = (long) (bufferPercent * jvmMax);
-    LOG.info("InMemThreshold: " + inMemThreshold);
-
-    BroadcastInputManager inputManager = new BroadcastInputManager(UUID.randomUUID().toString(),
-        conf, Runtime.getRuntime().maxMemory());
-    inputManager.setInitialMemoryAvailable(inMemThreshold);
-    inputManager.configureAndStart();
-
-    long requestSize = (long) (0.4f * inMemThreshold);
-    long compressedSize = 1l;
-    LOG.info("RequestSize: " + requestSize);
-    
-    FetchedInput fi1 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(1,
1));
-    assertEquals(FetchedInput.Type.MEMORY, fi1.getType());
-    
-    
-    FetchedInput fi2 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(2,
1));
-    assertEquals(FetchedInput.Type.MEMORY, fi2.getType());
-    
-    
-    // Over limit by this point. Next reserve should give back a DISK allocation
-    FetchedInput fi3 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(3,
1));
-    assertEquals(FetchedInput.Type.DISK, fi3.getType());
-    
-    
-    // Freed one memory allocation. Next should be mem again.
-    fi1.abort();
-    fi1.free();
-    FetchedInput fi4 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4,
1));
-    assertEquals(FetchedInput.Type.MEMORY, fi4.getType());
-    
-    // Freed one disk allocation. Next sould be disk again (no mem freed)
-    fi3.abort();
-    fi3.free();
-    FetchedInput fi5 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4,
1));
-    assertEquals(FetchedInput.Type.DISK, fi5.getType());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0df10815/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
new file mode 100644
index 0000000..3ec839d
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.impl.SimpleFetchedInputAllocator;
+import org.junit.Test;
+
+public class TestSimpleFetchedInputAllocator {
+
+  private static final Log LOG = LogFactory.getLog(TestSimpleFetchedInputAllocator.class);
+  
+  @Test
+  public void testInMemAllocation() throws IOException {
+    String localDirs = "/tmp/" + this.getClass().getName();
+    Configuration conf = new Configuration();
+    
+    long jvmMax = Runtime.getRuntime().maxMemory();
+    LOG.info("jvmMax: " + jvmMax);
+    
+    float bufferPercent = 0.1f;
+    conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, bufferPercent);
+    conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 1.0f);
+    conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
+    
+    long inMemThreshold = (long) (bufferPercent * jvmMax);
+    LOG.info("InMemThreshold: " + inMemThreshold);
+
+    SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator(UUID.randomUUID().toString(),
+        conf, Runtime.getRuntime().maxMemory());
+    inputManager.setInitialMemoryAvailable(inMemThreshold);
+    inputManager.configureAndStart();
+
+    long requestSize = (long) (0.4f * inMemThreshold);
+    long compressedSize = 1l;
+    LOG.info("RequestSize: " + requestSize);
+    
+    FetchedInput fi1 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(1,
1));
+    assertEquals(FetchedInput.Type.MEMORY, fi1.getType());
+    
+    
+    FetchedInput fi2 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(2,
1));
+    assertEquals(FetchedInput.Type.MEMORY, fi2.getType());
+    
+    
+    // Over limit by this point. Next reserve should give back a DISK allocation
+    FetchedInput fi3 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(3,
1));
+    assertEquals(FetchedInput.Type.DISK, fi3.getType());
+    
+    
+    // Freed one memory allocation. Next should be mem again.
+    fi1.abort();
+    fi1.free();
+    FetchedInput fi4 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4,
1));
+    assertEquals(FetchedInput.Type.MEMORY, fi4.getType());
+    
+    // Freed one disk allocation. Next sould be disk again (no mem freed)
+    fi3.abort();
+    fi3.free();
+    FetchedInput fi5 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4,
1));
+    assertEquals(FetchedInput.Type.DISK, fi5.getType());
+  }
+
+}


Mime
View raw message