tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [7/9] TEZ-1479. Disambiguate (refactor) between ShuffleInputEventHandlers and Fetchers. (sseth) (cherry picked from commit 7be5830a908602ff91a07d3020f2dddf7705d48f)
Date Wed, 15 Oct 2014 18:59:43 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
deleted file mode 100644
index 7fd6125..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ /dev/null
@@ -1,930 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumFileSystem;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.FileChunk;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.util.Progressable;
-import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.common.Constants;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.combine.Combiner;
-import org.apache.tez.runtime.library.common.sort.impl.IFile;
-import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
-import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
-import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
-import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
-import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
-
-
-/**
- * Usage. Create instance. setInitialMemoryAvailable(long), configureAndStart()
- *
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-@SuppressWarnings(value={"rawtypes"})
-public class MergeManager {
-  
-  private static final Log LOG = LogFactory.getLog(MergeManager.class);
-
-  private final Configuration conf;
-  private final FileSystem localFS;
-  private final FileSystem rfs;
-  private final LocalDirAllocator localDirAllocator;
-  
-  private final  TezTaskOutputFiles mapOutputFile;
-  private final Progressable nullProgressable = new NullProgressable();
-  private final Combiner combiner;  
-  
-  private final Set<MapOutput> inMemoryMergedMapOutputs = 
-    new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
-  private final IntermediateMemoryToMemoryMerger memToMemMerger;
-
-  private final Set<MapOutput> inMemoryMapOutputs = 
-    new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
-  private final InMemoryMerger inMemoryMerger;
-  
-  private final Set<FileChunk> onDiskMapOutputs = new TreeSet<FileChunk>();
-  private final OnDiskMerger onDiskMerger;
-  
-  private final long memoryLimit;
-  private final int postMergeMemLimit;
-  private long usedMemory;
-  private long commitMemory;
-  private final int ioSortFactor;
-  private final long maxSingleShuffleLimit;
-  
-  private final int memToMemMergeOutputsThreshold; 
-  private final long mergeThreshold;
-  
-  private final long initialMemoryAvailable;
-
-  private final ExceptionReporter exceptionReporter;
-  
-  private final InputContext inputContext;
-
-  private final TezCounter spilledRecordsCounter;
-
-  private final TezCounter reduceCombineInputCounter;
-
-  private final TezCounter mergedMapOutputsCounter;
-  
-  private final TezCounter numMemToDiskMerges;
-  private final TezCounter numDiskToDiskMerges;
-  private final TezCounter additionalBytesWritten;
-  private final TezCounter additionalBytesRead;
-  
-  private final CompressionCodec codec;
-  
-  private volatile boolean finalMergeComplete = false;
-  
-  private final boolean ifileReadAhead;
-  private final int ifileReadAheadLength;
-  private final int ifileBufferSize;
-
-
-  /**
-   * Construct the MergeManager. Must call start before it becomes usable.
-   */
-  public MergeManager(Configuration conf, 
-                      FileSystem localFS,
-                      LocalDirAllocator localDirAllocator,  
-                      InputContext inputContext,
-                      Combiner combiner,
-                      TezCounter spilledRecordsCounter,
-                      TezCounter reduceCombineInputCounter,
-                      TezCounter mergedMapOutputsCounter,
-                      ExceptionReporter exceptionReporter,
-                      long initialMemoryAvailable,
-                      CompressionCodec codec,
-                      boolean ifileReadAheadEnabled,
-                      int ifileReadAheadLength) {
-    this.inputContext = inputContext;
-    this.conf = conf;
-    this.localDirAllocator = localDirAllocator;
-    this.exceptionReporter = exceptionReporter;
-    this.initialMemoryAvailable = initialMemoryAvailable;
-    
-    this.combiner = combiner;
-
-    this.reduceCombineInputCounter = reduceCombineInputCounter;
-    this.spilledRecordsCounter = spilledRecordsCounter;
-    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
-    this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
-    
-    this.localFS = localFS;
-    this.rfs = ((LocalFileSystem)localFS).getRaw();
-    
-    this.numDiskToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_DISK_TO_DISK_MERGES);
-    this.numMemToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_MEM_TO_DISK_MERGES);
-    this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
-    this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
-
-    this.codec = codec;
-    this.ifileReadAhead = ifileReadAheadEnabled;
-    if (this.ifileReadAhead) {
-      this.ifileReadAheadLength = ifileReadAheadLength;
-    } else {
-      this.ifileReadAheadLength = 0;
-    }
-    this.ifileBufferSize = conf.getInt("io.file.buffer.size",
-        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
-    
-    // Figure out initial memory req start
-    final float maxInMemCopyUse =
-      conf.getFloat(
-          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 
-          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
-    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
-      throw new IllegalArgumentException("Invalid value for " +
-          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": " +
-          maxInMemCopyUse);
-    }
-
-    // Allow unit tests to fix Runtime memory
-    long memLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
-        Math.min(inputContext.getTotalMemoryAvailableToTask(), Integer.MAX_VALUE)) * maxInMemCopyUse);
-
-    float maxRedPer = conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT,
-        TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
-    if (maxRedPer > 1.0 || maxRedPer < 0.0) {
-      throw new TezUncheckedException(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT + maxRedPer);
-    }
-    // TODO maxRedBuffer should be a long.
-    int maxRedBuffer = (int) Math.min(inputContext.getTotalMemoryAvailableToTask() * maxRedPer,
-        Integer.MAX_VALUE);
-    // Figure out initial memory req end
-    
-    if (this.initialMemoryAvailable < memLimit) {
-      this.memoryLimit = this.initialMemoryAvailable;
-    } else {
-      this.memoryLimit = memLimit;
-    }
-    
-    if (this.initialMemoryAvailable < maxRedBuffer) {
-      this.postMergeMemLimit = (int) this.initialMemoryAvailable;
-    } else {
-      this.postMergeMemLimit = maxRedBuffer;
-    }
-    
-    LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer
-        + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + "Updated to: ShuffleMem="
-        + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
-
-    this.ioSortFactor = 
-        conf.getInt(
-            TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 
-            TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
-    
-    final float singleShuffleMemoryLimitPercent =
-        conf.getFloat(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
-    if (singleShuffleMemoryLimitPercent <= 0.0f
-        || singleShuffleMemoryLimitPercent > 1.0f) {
-      throw new IllegalArgumentException("Invalid value for "
-          + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
-          + singleShuffleMemoryLimitPercent);
-    }
-
-    this.maxSingleShuffleLimit = 
-      (long)(memoryLimit * singleShuffleMemoryLimitPercent);
-    this.memToMemMergeOutputsThreshold = 
-            conf.getInt(
-                TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, 
-                ioSortFactor);
-    this.mergeThreshold = 
-        (long)(this.memoryLimit * 
-               conf.getFloat(
-                   TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 
-                   TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT));
-    LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
-             "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
-             "mergeThreshold=" + mergeThreshold + ", " + 
-             "ioSortFactor=" + ioSortFactor + ", " +
-             "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
-    
-    if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
-      throw new RuntimeException("Invlaid configuration: "
-          + "maxSingleShuffleLimit should be less than mergeThreshold"
-          + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
-          + "mergeThreshold: " + this.mergeThreshold);
-    }
-    
-    boolean allowMemToMemMerge = 
-        conf.getBoolean(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, 
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM_DEFAULT);
-      if (allowMemToMemMerge) {
-        this.memToMemMerger = 
-          new IntermediateMemoryToMemoryMerger(this,
-                                               memToMemMergeOutputsThreshold);
-      } else {
-        this.memToMemMerger = null;
-      }
-      
-      this.inMemoryMerger = new InMemoryMerger(this);
-      
-      this.onDiskMerger = new OnDiskMerger(this);
-  }
-
-  @Private
-  void configureAndStart() {
-    if (this.memToMemMerger != null) {
-      memToMemMerger.start();
-    }
-    this.inMemoryMerger.start();
-    this.onDiskMerger.start();
-  }
-
-  /**
-   * Exposing this to get an initial memory ask without instantiating the object.
-   */
-  @Private
-  static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
-    final float maxInMemCopyUse =
-        conf.getFloat(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
-      if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
-        throw new IllegalArgumentException("Invalid value for " +
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": " +
-            maxInMemCopyUse);
-      }
-
-      // Allow unit tests to fix Runtime memory
-      long memLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
-          Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
-      
-      LOG.info("Initial Shuffle Memory Required: " + memLimit + ", based on INPUT_BUFFER_factor: " + maxInMemCopyUse);
-
-      float maxRedPer = conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT,
-          TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
-      if (maxRedPer > 1.0 || maxRedPer < 0.0) {
-        throw new TezUncheckedException(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT + maxRedPer);
-      }
-      // TODO maxRedBuffer should be a long.
-      int maxRedBuffer = (int) Math.min(maxAvailableTaskMemory * maxRedPer,
-          Integer.MAX_VALUE);
-      LOG.info("Initial Memory required for final merged output: " + maxRedBuffer + ", using factor: " + maxRedPer);
-
-      long reqMem = Math.max(maxRedBuffer, memLimit);
-      return reqMem;
-  }
-
-  public void waitForInMemoryMerge() throws InterruptedException {
-    inMemoryMerger.waitForMerge();
-  }
-  
-  private boolean canShuffleToMemory(long requestedSize) {
-    return (requestedSize < maxSingleShuffleLimit);
-  }
-
-  final private MapOutput stallShuffle = MapOutput.createWaitMapOutput(null);
-
-  public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier, 
-                                             long requestedSize,
-                                             long compressedLength,
-                                             int fetcher
-                                             ) throws IOException {
-    if (!canShuffleToMemory(requestedSize)) {
-      LOG.info(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize + 
-               " is greater than maxSingleShuffleLimit (" + 
-               maxSingleShuffleLimit + ")");
-      return MapOutput.createDiskMapOutput(srcAttemptIdentifier, this, compressedLength, conf,
-          fetcher, true, mapOutputFile);
-    }
-    
-    // Stall shuffle if we are above the memory limit
-
-    // It is possible that all threads could just be stalling and not make
-    // progress at all. This could happen when:
-    //
-    // requested size is causing the used memory to go above limit &&
-    // requested size < singleShuffleLimit &&
-    // current used size < mergeThreshold (merge will not get triggered)
-    //
-    // To avoid this from happening, we allow exactly one thread to go past
-    // the memory limit. We check (usedMemory > memoryLimit) and not
-    // (usedMemory + requestedSize > memoryLimit). When this thread is done
-    // fetching, this will automatically trigger a merge thereby unlocking
-    // all the stalled threads
-    
-    if (usedMemory > memoryLimit) {
-      LOG.debug(srcAttemptIdentifier + ": Stalling shuffle since usedMemory (" + usedMemory
-          + ") is greater than memoryLimit (" + memoryLimit + ")." + 
-          " CommitMemory is (" + commitMemory + ")"); 
-      return stallShuffle;
-    }
-    
-    // Allow the in-memory shuffle to progress
-    LOG.debug(srcAttemptIdentifier + ": Proceeding with shuffle since usedMemory ("
-        + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
-        + "CommitMemory is (" + commitMemory + ")"); 
-    return unconditionalReserve(srcAttemptIdentifier, requestedSize, true);
-  }
-  
-  /**
-   * Unconditional Reserve is used by the Memory-to-Memory thread
-   */
-  private synchronized MapOutput unconditionalReserve(
-      InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) throws
-      IOException {
-    usedMemory += requestedSize;
-    return MapOutput.createMemoryMapOutput(srcAttemptIdentifier, this, (int)requestedSize,
-        primaryMapOutput);
-  }
-  
-  synchronized void unreserve(long size) {
-    commitMemory -= size;
-    usedMemory -= size;
-  }
-
-  public synchronized void closeInMemoryFile(MapOutput mapOutput) { 
-    inMemoryMapOutputs.add(mapOutput);
-    LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
-        + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
-        + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
-
-    commitMemory+= mapOutput.getSize();
-
-    synchronized (inMemoryMerger) {
-      // Can hang if mergeThreshold is really low.
-      // TODO Can avoid spilling in case total input size is between
-      // mergeTghreshold and total available size.
-      if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
-        LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
-            commitMemory + " > mergeThreshold=" + mergeThreshold + 
-            ". Current usedMemory=" + usedMemory);
-        inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
-        inMemoryMergedMapOutputs.clear();
-        inMemoryMerger.startMerge(inMemoryMapOutputs);
-      } 
-    }
-
-    // This should likely run a Combiner.
-    if (memToMemMerger != null) {
-      synchronized (memToMemMerger) {
-        if (!memToMemMerger.isInProgress() && 
-            inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
-          memToMemMerger.startMerge(inMemoryMapOutputs);
-        }
-      }
-    }
-  }
-  
-  
-  public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
-    inMemoryMergedMapOutputs.add(mapOutput);
-    LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + 
-             ", inMemoryMergedMapOutputs.size() -> " + 
-             inMemoryMergedMapOutputs.size());
-  }
-  
-  public synchronized void closeOnDiskFile(FileChunk file) {
-    onDiskMapOutputs.add(file);
-
-    synchronized (onDiskMerger) {
-      if (!onDiskMerger.isInProgress() &&
-          onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
-        onDiskMerger.startMerge(onDiskMapOutputs);
-      }
-    }
-  }
-
-  /**
-   * Should <b>only</b> be used after the Shuffle phaze is complete, otherwise can
-   * return an invalid state since a merge may not be in progress dur to
-   * inadequate inputs
-   * 
-   * @return true if the merge process is complete, otherwise false
-   */
-  @Private
-  public boolean isMergeComplete() {
-    return finalMergeComplete;
-  }
-  
-  public TezRawKeyValueIterator close() throws Throwable {
-    // Wait for on-going merges to complete
-    if (memToMemMerger != null) { 
-      memToMemMerger.close();
-    }
-    inMemoryMerger.close();
-    onDiskMerger.close();
-    
-    List<MapOutput> memory = 
-      new ArrayList<MapOutput>(inMemoryMergedMapOutputs);
-    inMemoryMergedMapOutputs.clear();
-    memory.addAll(inMemoryMapOutputs);
-    inMemoryMapOutputs.clear();
-    List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs);
-    onDiskMapOutputs.clear();
-    TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
-    this.finalMergeComplete = true;
-    return kvIter;
-  }
-   
-  void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
-      throws IOException, InterruptedException {
-    combiner.combine(kvIter, writer);
-  }
-
-  /**
-   * Merges multiple in-memory segment to another in-memory segment
-   */
-  private class IntermediateMemoryToMemoryMerger 
-  extends MergeThread<MapOutput> {
-    
-    public IntermediateMemoryToMemoryMerger(MergeManager manager, 
-                                            int mergeFactor) {
-      super(manager, mergeFactor, exceptionReporter);
-      setName("MemToMemMerger [" + TezUtilsInternal
-          .cleanVertexName(inputContext.getSourceVertexName()) + "]");
-      setDaemon(true);
-    }
-
-    @Override
-    public void merge(List<MapOutput> inputs) throws IOException {
-      if (inputs == null || inputs.size() == 0) {
-        return;
-      }
-
-      InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier(); 
-      List<Segment> inMemorySegments = new ArrayList<Segment>();
-      long mergeOutputSize = 
-        createInMemorySegments(inputs, inMemorySegments, 0);
-      int noInMemorySegments = inMemorySegments.size();
-      
-      MapOutput mergedMapOutputs = 
-        unconditionalReserve(dummyMapId, mergeOutputSize, false);
-      
-      Writer writer = 
-        new InMemoryWriter(mergedMapOutputs.getArrayStream());
-
-      LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
-               " segments of total-size: " + mergeOutputSize);
-
-      // Nothing will be materialized to disk because the sort factor is being
-      // set to the number of in memory segments.
-      // TODO Is this doing any combination ?
-      TezRawKeyValueIterator rIter = 
-        TezMerger.merge(conf, rfs,
-                       ConfigUtils.getIntermediateInputKeyClass(conf),
-                       ConfigUtils.getIntermediateInputValueClass(conf),
-                       inMemorySegments, inMemorySegments.size(),
-                       new Path(inputContext.getUniqueIdentifier()),
-                       (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-                       nullProgressable, null, null, null, null); 
-      TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
-      writer.close();
-
-      LOG.info(inputContext.getUniqueIdentifier() +  
-               " Memory-to-Memory merge of the " + noInMemorySegments +
-               " files in-memory complete.");
-
-      // Note the output of the merge
-      closeInMemoryMergedFile(mergedMapOutputs);
-    }
-  }
-  
-  /**
-   * Merges multiple in-memory segment to a disk segment
-   */
-  private class InMemoryMerger extends MergeThread<MapOutput> {
-    
-    public InMemoryMerger(MergeManager manager) {
-      super(manager, Integer.MAX_VALUE, exceptionReporter);
-      setName("MemtoDiskMerger [" + TezUtilsInternal
-          .cleanVertexName(inputContext.getSourceVertexName()) + "]");
-      setDaemon(true);
-    }
-    
-    @Override
-    public void merge(List<MapOutput> inputs) throws IOException, InterruptedException {
-      if (inputs == null || inputs.size() == 0) {
-        return;
-      }
-      
-      numMemToDiskMerges.increment(1);
-      
-      //name this output file same as the name of the first file that is 
-      //there in the current list of inmem files (this is guaranteed to
-      //be absent on the disk currently. So we don't overwrite a prev. 
-      //created spill). Also we need to create the output file now since
-      //it is not guaranteed that this file will be present after merge
-      //is called (we delete empty files as soon as we see them
-      //in the merge method)
-
-      //figure out the mapId 
-      InputAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
-
-      List<Segment> inMemorySegments = new ArrayList<Segment>();
-      long mergeOutputSize = 
-        createInMemorySegments(inputs, inMemorySegments,0);
-      int noInMemorySegments = inMemorySegments.size();
-
-      // TODO Maybe track serialized vs deserialized bytes.
-      
-      // All disk writes done by this merge are overhead - due to the lac of
-      // adequate memory to keep all segments in memory.
-      Path outputPath = mapOutputFile.getInputFileForWrite(
-          srcTaskIdentifier.getInputIdentifier().getInputIndex(),
-          mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
-
-      Writer writer = null;
-      long outFileLen = 0;
-      try {
-        writer =
-            new Writer(conf, rfs, outputPath,
-                (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
-                (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                codec, null, null);
-
-        TezRawKeyValueIterator rIter = null;
-        LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
-            " segments...");
-
-        // Nothing actually materialized to disk - controlled by setting sort-factor to #segments.
-        rIter = TezMerger.merge(conf, rfs,
-            (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
-            (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-            inMemorySegments, inMemorySegments.size(),
-            new Path(inputContext.getUniqueIdentifier()),
-            (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-            nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null);
-        // spilledRecordsCounter is tracking the number of keys that will be
-        // read from each of the segments being merged - which is essentially
-        // what will be written to disk.
-
-        if (null == combiner) {
-          TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
-        } else {
-          // TODO Counters for Combine
-          runCombineProcessor(rIter, writer);
-        }
-        writer.close();
-        additionalBytesWritten.increment(writer.getCompressedLength());
-        writer = null;
-
-        outFileLen = localFS.getFileStatus(outputPath).getLen();
-        LOG.info(inputContext.getUniqueIdentifier() +
-            " Merge of the " + noInMemorySegments +
-            " files in-memory complete." +
-            " Local file is " + outputPath + " of size " +
-            outFileLen);
-      } catch (IOException e) { 
-        //make sure that we delete the ondisk file that we created 
-        //earlier when we invoked cloneFileAttributes
-        localFS.delete(outputPath, true);
-        throw e;
-      } finally {
-        if (writer != null) {
-          writer.close();
-        }
-      }
-
-      // Note the output of the merge
-      closeOnDiskFile(new FileChunk(outputPath, 0, outFileLen, false));
-    }
-
-  }
-
-  /**
-   * Merges multiple on-disk segments
-   */
-  private class OnDiskMerger extends MergeThread<FileChunk> {
-
-    public OnDiskMerger(MergeManager manager) {
-      super(manager, ioSortFactor, exceptionReporter);
-      setName("DiskToDiskMerger [" + TezUtilsInternal
-          .cleanVertexName(inputContext.getSourceVertexName()) + "]");
-      setDaemon(true);
-    }
-    
-    @Override
-    public void merge(List<FileChunk> inputs) throws IOException {
-      // sanity check
-      if (inputs == null || inputs.isEmpty()) {
-        LOG.info("No ondisk files to merge...");
-        return;
-      }
-      numDiskToDiskMerges.increment(1);
-      
-      long approxOutputSize = 0;
-      int bytesPerSum = 
-        conf.getInt("io.bytes.per.checksum", 512);
-      
-      LOG.info("OnDiskMerger: We have  " + inputs.size() + 
-               " map outputs on disk. Triggering merge...");
-
-      List<Segment> inputSegments = new ArrayList<Segment>(inputs.size());
-
-      // 1. Prepare the list of files to be merged.
-      for (FileChunk fileChunk : inputs) {
-        final long offset = fileChunk.getOffset();
-        final long size = fileChunk.getLength();
-        final boolean preserve = fileChunk.preserveAfterUse();
-        final Path file = fileChunk.getPath();
-        approxOutputSize += size;
-        Segment segment = new Segment(conf, rfs, file, offset, size, codec, ifileReadAhead,
-            ifileReadAheadLength, ifileBufferSize, preserve);
-        inputSegments.add(segment);
-      }
-
-      // add the checksum length
-      approxOutputSize += 
-        ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
-
-      // 2. Start the on-disk merge process
-      Path outputPath = 
-        localDirAllocator.getLocalPathForWrite(inputs.get(0).getPath().toString(),
-            approxOutputSize, conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
-      Writer writer = 
-        new Writer(conf, rfs, outputPath, 
-                        (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
-                        (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                        codec, null, null);
-      Path tmpDir = new Path(inputContext.getUniqueIdentifier());
-      try {
-        TezRawKeyValueIterator iter = TezMerger.merge(conf, rfs,
-            (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
-            (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-            inputSegments,
-            ioSortFactor, tmpDir,
-            (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-            nullProgressable, true, spilledRecordsCounter, null,
-            mergedMapOutputsCounter, null);
-
-        // TODO Maybe differentiate between data written because of Merges and
-        // the finalMerge (i.e. final mem available may be different from
-        // initial merge mem)
-        TezMerger.writeFile(iter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
-        writer.close();
-        additionalBytesWritten.increment(writer.getCompressedLength());
-      } catch (IOException e) {
-        localFS.delete(outputPath, true);
-        throw e;
-      }
-
-      final long outputLen = localFS.getFileStatus(outputPath).getLen();
-      closeOnDiskFile(new FileChunk(outputPath, 0, outputLen, false));
-
-      LOG.info(inputContext.getUniqueIdentifier() +
-          " Finished merging " + inputs.size() + 
-          " map output files on disk of total-size " + 
-          approxOutputSize + "." + 
-          " Local output file is " + outputPath + " of size " +
-          outputLen);
-    }
-  }
-  
-  private long createInMemorySegments(List<MapOutput> inMemoryMapOutputs,
-                                      List<Segment> inMemorySegments, 
-                                      long leaveBytes
-                                      ) throws IOException {
-    long totalSize = 0L;
-    // We could use fullSize could come from the RamManager, but files can be
-    // closed but not yet present in inMemoryMapOutputs
-    long fullSize = 0L;
-    for (MapOutput mo : inMemoryMapOutputs) {
-      fullSize += mo.getMemory().length;
-    }
-    while(fullSize > leaveBytes) {
-      MapOutput mo = inMemoryMapOutputs.remove(0);
-      byte[] data = mo.getMemory();
-      long size = data.length;
-      totalSize += size;
-      fullSize -= size;
-      IFile.Reader reader = new InMemoryReader(MergeManager.this, 
-                                                   mo.getAttemptIdentifier(),
-                                                   data, 0, (int)size);
-      inMemorySegments.add(new Segment(reader, true, 
-                                            (mo.isPrimaryMapOutput() ? 
-                                            mergedMapOutputsCounter : null)));
-    }
-    return totalSize;
-  }
-
-  class RawKVIteratorReader extends IFile.Reader {
-
-    private final TezRawKeyValueIterator kvIter;
-
-    public RawKVIteratorReader(TezRawKeyValueIterator kvIter, long size)
-        throws IOException {
-      super(null, size, null, spilledRecordsCounter, null, ifileReadAhead,
-          ifileReadAheadLength, ifileBufferSize);
-      this.kvIter = kvIter;
-    }
-    @Override
-    public KeyState readRawKey(DataInputBuffer key) throws IOException {
-      if (kvIter.next()) {
-        final DataInputBuffer kb = kvIter.getKey();
-        final int kp = kb.getPosition();
-        final int klen = kb.getLength() - kp;
-        key.reset(kb.getData(), kp, klen);
-        bytesRead += klen;
-        return KeyState.NEW_KEY;
-      }
-      return KeyState.NO_KEY;
-    }
-    public void nextRawValue(DataInputBuffer value) throws IOException {
-      final DataInputBuffer vb = kvIter.getValue();
-      final int vp = vb.getPosition();
-      final int vlen = vb.getLength() - vp;
-      value.reset(vb.getData(), vp, vlen);
-      bytesRead += vlen;
-    }
-    public long getPosition() throws IOException {
-      return bytesRead;
-    }
-
-    public void close() throws IOException {
-      kvIter.close();
-    }
-  }
-
-  private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
-                                       List<MapOutput> inMemoryMapOutputs,
-                                       List<FileChunk> onDiskMapOutputs
-                                       ) throws IOException {
-    LOG.info("finalMerge called with " + 
-             inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
-             onDiskMapOutputs.size() + " on-disk map-outputs");
-    
-    
-    
-
-    // merge config params
-    Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
-    Class valueClass = (Class)ConfigUtils.getIntermediateInputValueClass(job);
-    final Path tmpDir = new Path(inputContext.getUniqueIdentifier());
-    final RawComparator comparator =
-      (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);
-
-    // segments required to vacate memory
-    List<Segment> memDiskSegments = new ArrayList<Segment>();
-    long inMemToDiskBytes = 0;
-    boolean mergePhaseFinished = false;
-    if (inMemoryMapOutputs.size() > 0) {
-      int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
-      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
-                                                memDiskSegments,
-                                                this.postMergeMemLimit);
-      final int numMemDiskSegments = memDiskSegments.size();
-      if (numMemDiskSegments > 0 &&
-            ioSortFactor > onDiskMapOutputs.size()) {
-        
-        // If we reach here, it implies that we have less than io.sort.factor
-        // disk segments and this will be incremented by 1 (result of the 
-        // memory segments merge). Since this total would still be 
-        // <= io.sort.factor, we will not do any more intermediate merges,
-        // the merge of all these disk segments would be directly fed to the
-        // reduce method
-        
-        mergePhaseFinished = true;
-        // must spill to disk, but can't retain in-mem for intermediate merge
-        final Path outputPath = 
-          mapOutputFile.getInputFileForWrite(srcTaskId,
-                                             inMemToDiskBytes).suffix(
-                                                 Constants.MERGED_OUTPUT_PREFIX);
-        final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass,
-            memDiskSegments, numMemDiskSegments, tmpDir, comparator, nullProgressable,
-            spilledRecordsCounter, null, additionalBytesRead, null);
-        final Writer writer = new Writer(job, fs, outputPath,
-            keyClass, valueClass, codec, null, null);
-        try {
-          TezMerger.writeFile(rIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
-        } catch (IOException e) {
-          if (null != outputPath) {
-            try {
-              fs.delete(outputPath, true);
-            } catch (IOException ie) {
-              // NOTHING
-            }
-          }
-          throw e;
-        } finally {
-          if (null != writer) {
-            writer.close();
-            additionalBytesWritten.increment(writer.getCompressedLength());
-          }
-        }
-
-        final FileStatus fStatus = localFS.getFileStatus(outputPath);
-        // add to list of final disk outputs.
-        onDiskMapOutputs.add(new FileChunk(outputPath, 0, fStatus.getLen(), false));
-
-        LOG.info("Merged " + numMemDiskSegments + " segments, " +
-                 inMemToDiskBytes + " bytes to disk to satisfy " +
-                 "reduce memory limit");
-        inMemToDiskBytes = 0;
-        memDiskSegments.clear();
-      } else if (inMemToDiskBytes != 0) {
-        LOG.info("Keeping " + numMemDiskSegments + " segments, " +
-                 inMemToDiskBytes + " bytes in memory for " +
-                 "intermediate, on-disk merge");
-      }
-    }
-
-    // segments on disk
-    List<Segment> diskSegments = new ArrayList<Segment>();
-    long onDiskBytes = inMemToDiskBytes;
-    FileChunk[] onDisk = onDiskMapOutputs.toArray(new FileChunk[onDiskMapOutputs.size()]);
-    for (FileChunk fileChunk : onDisk) {
-      final long fileLength = fileChunk.getLength();
-      onDiskBytes += fileLength;
-      LOG.debug("Disk file: " + fileChunk.getPath() + " Length is " + fileLength);
-
-      final Path file = fileChunk.getPath();
-      TezCounter counter =
-          file.toString().endsWith(Constants.MERGED_OUTPUT_PREFIX) ? null : mergedMapOutputsCounter;
-
-      final long fileOffset = fileChunk.getOffset();
-      final boolean preserve = fileChunk.preserveAfterUse();
-      diskSegments.add(new Segment(job, fs, file, fileOffset, fileLength, codec, ifileReadAhead,
-                                   ifileReadAheadLength, ifileBufferSize, preserve, counter));
-    }
-    LOG.info("Merging " + onDisk.length + " files, " +
-             onDiskBytes + " bytes from disk");
-    Collections.sort(diskSegments, new Comparator<Segment>() {
-      public int compare(Segment o1, Segment o2) {
-        if (o1.getLength() == o2.getLength()) {
-          return 0;
-        }
-        return o1.getLength() < o2.getLength() ? -1 : 1;
-      }
-    });
-
-    // build final list of segments from merged backed by disk + in-mem
-    List<Segment> finalSegments = new ArrayList<Segment>();
-    long inMemBytes = createInMemorySegments(inMemoryMapOutputs, 
-                                             finalSegments, 0);
-    LOG.info("Merging " + finalSegments.size() + " segments, " +
-             inMemBytes + " bytes from memory into reduce");
-    if (0 != onDiskBytes) {
-      final int numInMemSegments = memDiskSegments.size();
-      diskSegments.addAll(0, memDiskSegments);
-      memDiskSegments.clear();
-      TezRawKeyValueIterator diskMerge = TezMerger.merge(
-          job, fs, keyClass, valueClass, codec, diskSegments,
-          ioSortFactor, numInMemSegments, tmpDir, comparator,
-          nullProgressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
-      diskSegments.clear();
-      if (0 == finalSegments.size()) {
-        return diskMerge;
-      }
-      finalSegments.add(new Segment(
-            new RawKVIteratorReader(diskMerge, onDiskBytes), true));
-    }
-    // This is doing nothing but creating an iterator over the segments.
-    return TezMerger.merge(job, fs, keyClass, valueClass,
-                 finalSegments, finalSegments.size(), tmpDir,
-                 comparator, nullProgressable, spilledRecordsCounter, null,
-                 additionalBytesRead, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java
deleted file mode 100644
index aed2628..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-abstract class MergeThread<T> extends Thread {
-  
-  private static final Log LOG = LogFactory.getLog(MergeThread.class);
-
-  private volatile boolean inProgress = false;
-  private final List<T> inputs = new ArrayList<T>();
-  protected final MergeManager manager;
-  private final ExceptionReporter reporter;
-  private boolean closed = false;
-  private final int mergeFactor;
-  
-  public MergeThread(MergeManager manager, int mergeFactor,
-                     ExceptionReporter reporter) {
-    this.manager = manager;
-    this.mergeFactor = mergeFactor;
-    this.reporter = reporter;
-  }
-  
-  public synchronized void close() throws InterruptedException {
-    closed = true;
-    waitForMerge();
-    interrupt();
-  }
-
-  public synchronized boolean isInProgress() {
-    return inProgress;
-  }
-  
-  public synchronized void startMerge(Set<T> inputs) {
-    if (!closed) {
-      this.inputs.clear();
-      inProgress = true;
-      Iterator<T> iter=inputs.iterator();
-      for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
-        this.inputs.add(iter.next());
-        iter.remove();
-      }
-      LOG.info(getName() + ": Starting merge with " + this.inputs.size() + 
-               " segments, while ignoring " + inputs.size() + " segments");
-      notifyAll();
-    }
-  }
-
-  public synchronized void waitForMerge() throws InterruptedException {
-    while (inProgress) {
-      wait();
-    }
-  }
-
-  public void run() {
-    while (true) {
-      try {
-        // Wait for notification to start the merge...
-        synchronized (this) {
-          while (!inProgress) {
-            wait();
-          }
-        }
-
-        // Merge
-        merge(inputs);
-      } catch (InterruptedException ie) {
-        return;
-      } catch(Throwable t) {
-        reporter.reportException(t);
-        return;
-      } finally {
-        synchronized (this) {
-          // Clear inputs
-          inputs.clear();
-          inProgress = false;        
-          notifyAll();
-        }
-      }
-    }
-  }
-
-  public abstract void merge(List<T> inputs) 
-      throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
deleted file mode 100644
index 8214a0c..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezRuntimeFrameworkConfigs;
-import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
-import org.apache.tez.runtime.library.common.combine.Combiner;
-import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
-import org.apache.tez.runtime.library.shuffle.common.HttpConnection;
-import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * Usage: Create instance, setInitialMemoryAllocated(long), run()
- *
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class Shuffle implements ExceptionReporter {
-  
-  private static final Log LOG = LogFactory.getLog(Shuffle.class);
-  private static final int PROGRESS_FREQUENCY = 2000;
-  
-  private final Configuration conf;
-  private final InputContext inputContext;
-  
-  private final ShuffleClientMetrics metrics;
-
-  private final ShuffleInputEventHandler eventHandler;
-  private final ShuffleScheduler scheduler;
-  private final MergeManager merger;
-
-  private final SecretKey jobTokenSecret;
-  private final CompressionCodec codec;
-  private final boolean ifileReadAhead;
-  private final int ifileReadAheadLength;
-  private final int numFetchers;
-  private final boolean localDiskFetchEnabled;
-  
-  private Throwable throwable = null;
-  private String throwingThreadName = null;
-
-  private final RunShuffleCallable runShuffleCallable;
-  private volatile ListenableFuture<TezRawKeyValueIterator> runShuffleFuture;
-  private final ListeningExecutorService executor;
-  
-  private final String srcNameTrimmed;
-  
-  private final List<Fetcher> fetchers;
-  private final HttpConnectionParams httpConnectionParams;
-  
-  private AtomicBoolean isShutDown = new AtomicBoolean(false);
-  private AtomicBoolean fetchersClosed = new AtomicBoolean(false);
-  private AtomicBoolean schedulerClosed = new AtomicBoolean(false);
-  private AtomicBoolean mergerClosed = new AtomicBoolean(false);
-
-  public Shuffle(InputContext inputContext, Configuration conf, int numInputs,
-      long initialMemoryAvailable) throws IOException {
-    this.inputContext = inputContext;
-    this.conf = conf;
-    this.httpConnectionParams =
-        ShuffleUtils.constructHttpShuffleConnectionParams(conf);
-    this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
-        inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
-        this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
-    
-    this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
-    
-    this.jobTokenSecret = ShuffleUtils
-        .getJobTokenSecretFromTokenBytes(inputContext
-            .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
-    
-    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, conf);
-    } else {
-      codec = null;
-    }
-    this.ifileReadAhead = conf.getBoolean(
-        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
-        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
-    if (this.ifileReadAhead) {
-      this.ifileReadAheadLength = conf.getInt(
-          TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
-          TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
-    } else {
-      this.ifileReadAheadLength = 0;
-    }
-    
-    Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext);
-    
-    FileSystem localFS = FileSystem.getLocal(this.conf);
-    LocalDirAllocator localDirAllocator = 
-        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
-
-    // TODO TEZ Get rid of Map / Reduce references.
-    TezCounter shuffledInputsCounter = 
-        inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
-    TezCounter reduceShuffleBytes =
-        inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
-    TezCounter reduceDataSizeDecompressed = inputContext.getCounters().findCounter(
-        TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
-    TezCounter failedShuffleCounter =
-        inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
-    TezCounter spilledRecordsCounter = 
-        inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
-    TezCounter reduceCombineInputCounter =
-        inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
-    TezCounter mergedMapOutputsCounter =
-        inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
-    TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter(
-        TaskCounter.SHUFFLE_BYTES_TO_DISK);
-    TezCounter bytesShuffedToDiskDirect = inputContext.getCounters().findCounter(
-        TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
-    TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
-        TaskCounter.SHUFFLE_BYTES_TO_MEM);
-    
-    LOG.info("Shuffle assigned with " + numInputs + " inputs" + ", codec: "
-        + (codec == null ? "None" : codec.getClass().getName()) + 
-        "ifileReadAhead: " + ifileReadAhead);
-
-    boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
-      TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
-    scheduler = new ShuffleScheduler(
-          this.inputContext,
-          this.conf,
-          numInputs,
-          this,
-          shuffledInputsCounter,
-          reduceShuffleBytes,
-          reduceDataSizeDecompressed,
-          failedShuffleCounter,
-          bytesShuffedToDisk,
-          bytesShuffedToDiskDirect,
-          bytesShuffedToMem);
-
-    merger = new MergeManager(
-          this.conf,
-          localFS,
-          localDirAllocator,
-          inputContext,
-          combiner,
-          spilledRecordsCounter,
-          reduceCombineInputCounter,
-          mergedMapOutputsCounter,
-          this,
-          initialMemoryAvailable,
-          codec,
-          ifileReadAhead,
-          ifileReadAheadLength);
-
-    eventHandler= new ShuffleInputEventHandler(
-        inputContext,
-        scheduler,
-        sslShuffle);
-    
-    ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
-        .setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + srcNameTrimmed + "]").build());
-
-    int configuredNumFetchers = 
-        conf.getInt(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
-    numFetchers = Math.min(configuredNumFetchers, numInputs);
-    LOG.info("Num fetchers being started: " + numFetchers);
-    fetchers = Lists.newArrayListWithCapacity(numFetchers);
-    localDiskFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
-        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
-
-    executor = MoreExecutors.listeningDecorator(rawExecutor);
-    runShuffleCallable = new RunShuffleCallable();
-  }
-
-  public void handleEvents(List<Event> events) throws IOException {
-    if (!isShutDown.get()) {
-      eventHandler.handleEvents(events);
-    } else {
-      LOG.info("Ignoring events since already shutdown. EventCount: " + events.size());
-    }
-
-  }
-  
-  /**
-   * Indicates whether the Shuffle and Merge processing is complete.
-   * @return false if not complete, true if complete or if an error occurred.
-   * @throws InterruptedException 
-   * @throws IOException 
-   * @throws InputAlreadyClosedException 
-   */
-  // ZZZ Deal with these methods.
-  public boolean isInputReady() throws IOException, InterruptedException, TezException {
-    if (isShutDown.get()) {
-      throw new InputAlreadyClosedException();
-    }
-    if (throwable != null) {
-      handleThrowable(throwable);
-    }
-    if (runShuffleFuture == null) {
-      return false;
-    }
-    // Don't need to check merge status, since runShuffleFuture will only
-    // complete once merge is complete.
-    return runShuffleFuture.isDone();
-  }
-
-  private void handleThrowable(Throwable t) throws IOException, InterruptedException {
-    if (t instanceof IOException) {
-      throw (IOException) t;
-    } else if (t instanceof InterruptedException) {
-      throw (InterruptedException) t;
-    } else {
-      throw new UndeclaredThrowableException(t);
-    }
-  }
-
-  /**
-   * Waits for the Shuffle and Merge to complete, and returns an iterator over the input.
-   * @return an iterator over the fetched input.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  // ZZZ Deal with these methods.
-  public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException,
-      TezException {
-    Preconditions.checkState(runShuffleFuture != null,
-        "waitForInput can only be called after run");
-    TezRawKeyValueIterator kvIter = null;
-    try {
-      kvIter = runShuffleFuture.get();
-    } catch (ExecutionException e) {
-      Throwable cause = e.getCause();
-      handleThrowable(cause);
-    }
-    if (isShutDown.get()) {
-      throw new InputAlreadyClosedException();
-    }
-    if (throwable != null) {
-      handleThrowable(throwable);
-    }
-    return kvIter;
-  }
-
-  public void run() throws IOException {
-    merger.configureAndStart();
-    runShuffleFuture = executor.submit(runShuffleCallable);
-    Futures.addCallback(runShuffleFuture, new ShuffleRunnerFutureCallback());
-    executor.shutdown();
-  }
-
-  public void shutdown() {
-    if (!isShutDown.getAndSet(true)) {
-      // Interrupt so that the scheduler / merger sees this interrupt.
-      LOG.info("Shutting down Shuffle for source: " + srcNameTrimmed);
-      runShuffleFuture.cancel(true);
-      cleanupIgnoreErrors();
-    }
-  }
-
-  private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
-    @Override
-    public TezRawKeyValueIterator call() throws IOException, InterruptedException {
-
-      synchronized (this) {
-        for (int i = 0; i < numFetchers; ++i) {
-          Fetcher fetcher = new Fetcher(httpConnectionParams, scheduler, merger,
-            metrics, Shuffle.this, jobTokenSecret, ifileReadAhead, ifileReadAheadLength,
-            codec, inputContext, conf, localDiskFetchEnabled);
-          fetchers.add(fetcher);
-          fetcher.start();
-        }
-      }
-
-      
-      while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
-        synchronized (this) {
-          if (throwable != null) {
-            throw new ShuffleError("error in shuffle in " + throwingThreadName,
-                                   throwable);
-          }
-        }
-      }
-      
-      // Stop the map-output fetcher threads
-      cleanupFetchers(false);
-      
-      // stop the scheduler
-      cleanupShuffleScheduler(false);
-
-      // Finish the on-going merges...
-      TezRawKeyValueIterator kvIter = null;
-      try {
-        kvIter = merger.close();
-      } catch (Throwable e) {
-        throw new ShuffleError("Error while doing final merge " , e);
-      }
-      
-      // Sanity check
-      synchronized (Shuffle.this) {
-        if (throwable != null) {
-          throw new ShuffleError("error in shuffle in " + throwingThreadName,
-                                 throwable);
-        }
-      }
-
-      inputContext.inputIsReady();
-      LOG.info("merge complete for input vertex : " + inputContext.getSourceVertexName());
-      return kvIter;
-    }
-  }
-  
-  private synchronized void cleanupFetchers(boolean ignoreErrors) throws InterruptedException {
-    // Stop the fetcher threads
-    InterruptedException ie = null;
-    if (!fetchersClosed.getAndSet(true)) {
-      for (Fetcher fetcher : fetchers) {
-        try {
-          fetcher.shutDown();
-        } catch (InterruptedException e) {
-          if (ignoreErrors) {
-            LOG.info("Interrupted while shutting down fetchers. Ignoring.");
-          } else {
-            if (ie != null) {
-              ie = e;
-            } else {
-              LOG.warn("Ignoring exception while shutting down fetcher since a previous one was seen and will be thrown "
-                  + e);
-            }
-          }
-        }
-      }
-      fetchers.clear();
-      //All threads are shutdown.  It is safe to shutdown SSL factory
-      if (httpConnectionParams.isSSLShuffleEnabled()) {
-        HttpConnection.cleanupSSLFactory();
-      }
-      // throw only the first exception while attempting to shutdown.
-      if (ie != null) {
-        throw ie;
-      }
-    }
-  }
-
-  private void cleanupShuffleScheduler(boolean ignoreErrors) throws InterruptedException {
-
-    if (!schedulerClosed.getAndSet(true)) {
-      try {
-        scheduler.close();
-      } catch (InterruptedException e) {
-        if (ignoreErrors) {
-          LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring");
-        } else {
-          throw e;
-        }
-      }
-    }
-  }
-
-  private void cleanupMerger(boolean ignoreErrors) throws Throwable {
-    if (!mergerClosed.getAndSet(true)) {
-      try {
-        merger.close();
-      } catch (Throwable e) {
-        if (ignoreErrors) {
-          LOG.info("Exception while trying to shutdown merger, Ignoring", e);
-        } else {
-          throw e;
-        }
-      }
-    }
-  }
-
-  private void cleanupIgnoreErrors() {
-    try {
-      cleanupFetchers(true);
-      cleanupShuffleScheduler(true);
-      cleanupMerger(true);
-    } catch (Throwable t) {
-      // Ignore
-    }
-  }
-
-  @Private
-  public synchronized void reportException(Throwable t) {
-    if (throwable == null) {
-      throwable = t;
-      throwingThreadName = Thread.currentThread().getName();
-      // Notify the scheduler so that the reporting thread finds the 
-      // exception immediately.
-      synchronized (scheduler) {
-        scheduler.notifyAll();
-      }
-    }
-  }
-  
-  public static class ShuffleError extends IOException {
-    private static final long serialVersionUID = 5753909320586607881L;
-
-    ShuffleError(String msg, Throwable t) {
-      super(msg, t);
-    }
-  }
-
-  @Private
-  public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
-    return MergeManager.getInitialMemoryRequirement(conf, maxAvailableTaskMemory);
-  }
-  
-  private class ShuffleRunnerFutureCallback implements FutureCallback<TezRawKeyValueIterator> {
-    @Override
-    public void onSuccess(TezRawKeyValueIterator result) {
-      LOG.info("Shuffle Runner thread complete");
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      // ZZZ Handle failures during shutdown.
-      if (isShutDown.get()) {
-        LOG.info("Already shutdown. Ignoring error: ",  t);
-      } else {
-        LOG.error("ShuffleRunner failed with error", t);
-        inputContext.fatalError(t, "Shuffle Runner Failed");
-        cleanupIgnoreErrors();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleClientMetrics.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleClientMetrics.java
deleted file mode 100644
index f09c22f..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleClientMetrics.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.tez.common.TezRuntimeFrameworkConfigs;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.Constants;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
-
-class ShuffleClientMetrics implements Updater {
-
-  private MetricsRecord shuffleMetrics = null;
-  private int numFailedFetches = 0;
-  private int numSuccessFetches = 0;
-  private long numBytes = 0;
-  private int numThreadsBusy = 0;
-  private final int numCopiers;
-  
-  ShuffleClientMetrics(String dagName, String vertexName, int taskIndex, Configuration conf, 
-      String user) {
-    this.numCopiers = 
-        conf.getInt(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
-
-    MetricsContext metricsContext = MetricsUtil.getContext(Constants.TEZ);
-    this.shuffleMetrics = 
-      MetricsUtil.createRecord(metricsContext, "shuffleInput");
-    this.shuffleMetrics.setTag("user", user);
-    this.shuffleMetrics.setTag("dagName", dagName);
-    this.shuffleMetrics.setTag("taskId", TezRuntimeUtils.getTaskIdentifier(vertexName, taskIndex));
-    this.shuffleMetrics.setTag("sessionId", 
-        conf.get(
-            TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID,
-            TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT));
-    metricsContext.registerUpdater(this);
-  }
-  public synchronized void inputBytes(long numBytes) {
-    this.numBytes += numBytes;
-  }
-  public synchronized void failedFetch() {
-    ++numFailedFetches;
-  }
-  public synchronized void successFetch() {
-    ++numSuccessFetches;
-  }
-  public synchronized void threadBusy() {
-    ++numThreadsBusy;
-  }
-  public synchronized void threadFree() {
-    --numThreadsBusy;
-  }
-  public void doUpdates(MetricsContext unused) {
-    synchronized (this) {
-      shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
-      shuffleMetrics.incrMetric("shuffle_failed_fetches", 
-                                numFailedFetches);
-      shuffleMetrics.incrMetric("shuffle_success_fetches", 
-                                numSuccessFetches);
-      if (numCopiers != 0) {
-        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
-            100*((float)numThreadsBusy/numCopiers));
-      } else {
-        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
-      }
-      numBytes = 0;
-      numSuccessFetches = 0;
-      numFailedFetches = 0;
-    }
-    shuffleMetrics.update();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleHeader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleHeader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleHeader.java
deleted file mode 100644
index 327473e..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleHeader.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-/**
- * Shuffle Header information that is sent by the TaskTracker and 
- * deciphered by the Fetcher thread of Reduce task
- *
- */
-@InterfaceAudience.Private
-@InterfaceStability.Stable
-public class ShuffleHeader implements Writable {
-  
-  /** Header info of the shuffle http request/response */
-  public static final String HTTP_HEADER_NAME = "name";
-  public static final String DEFAULT_HTTP_HEADER_NAME = "mapreduce";
-  public static final String HTTP_HEADER_VERSION = "version";
-  public static final String DEFAULT_HTTP_HEADER_VERSION = "1.0.0";
-
-  /**
-   * The longest possible length of task attempt id that we will accept.
-   */
-  private static final int MAX_ID_LENGTH = 1000;
-
-  String mapId;
-  long uncompressedLength;
-  long compressedLength;
-  int forReduce;
-  
-  public ShuffleHeader() { }
-  
-  public ShuffleHeader(String mapId, long compressedLength,
-      long uncompressedLength, int forReduce) {
-    this.mapId = mapId;
-    this.compressedLength = compressedLength;
-    this.uncompressedLength = uncompressedLength;
-    this.forReduce = forReduce;
-  }
-  
-  public String getMapId() {
-    return this.mapId;
-  }
-  
-  public int getPartition() {
-    return this.forReduce;
-  }
-  
-  public long getUncompressedLength() {
-    return uncompressedLength;
-  }
-
-  public long getCompressedLength() {
-    return compressedLength;
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    mapId = WritableUtils.readStringSafely(in, MAX_ID_LENGTH);
-    compressedLength = WritableUtils.readVLong(in);
-    uncompressedLength = WritableUtils.readVLong(in);
-    forReduce = WritableUtils.readVInt(in);
-  }
-
-  public void write(DataOutput out) throws IOException {
-    Text.writeString(out, mapId);
-    WritableUtils.writeVLong(out, compressedLength);
-    WritableUtils.writeVLong(out, uncompressedLength);
-    WritableUtils.writeVInt(out, forReduce);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
deleted file mode 100644
index 2229231..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.common.shuffle.impl;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.BitSet;
-import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-
-public class ShuffleInputEventHandler {
-  
-  private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
-
-  private final ShuffleScheduler scheduler;
-  private final InputContext inputContext;
-
-  private int maxMapRuntime = 0;
-  private final boolean sslShuffle;
-
-  public ShuffleInputEventHandler(InputContext inputContext,
-      ShuffleScheduler scheduler, boolean sslShuffle) {
-    this.inputContext = inputContext;
-    this.scheduler = scheduler;
-    this.sslShuffle = sslShuffle;
-  }
-
-  public void handleEvents(List<Event> events) throws IOException {
-    for (Event event : events) {
-      handleEvent(event);
-    }
-  }
-  
-  
-  private void handleEvent(Event event) throws IOException {
-    if (event instanceof DataMovementEvent) {
-      processDataMovementEvent((DataMovementEvent) event);      
-    } else if (event instanceof InputFailedEvent) {
-      processTaskFailedEvent((InputFailedEvent) event);
-    }
-  }
-
-  private void processDataMovementEvent(DataMovementEvent dmEvent) throws IOException {
-    DataMovementEventPayloadProto shufflePayload;
-    try {
-      shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(dmEvent.getUserPayload()));
-    } catch (InvalidProtocolBufferException e) {
-      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
-    } 
-    int partitionId = dmEvent.getSourceIndex();
-    LOG.info("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex()
-        + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload));
-    // TODO NEWTEZ See if this duration hack can be removed.
-    int duration = shufflePayload.getRunDuration();
-    if (duration > maxMapRuntime) {
-      maxMapRuntime = duration;
-      scheduler.informMaxMapRunTime(maxMapRuntime);
-    }
-    if (shufflePayload.hasEmptyPartitions()) {
-      try {
-        byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
-        BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
-        if (emptyPartitionsBitSet.get(partitionId)) {
-          InputAttemptIdentifier srcAttemptIdentifier =
-              new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion());
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                "Source partition: " + partitionId + " did not generate any data. SrcAttempt: ["
-                    + srcAttemptIdentifier + "]. Not fetching.");
-          }
-          scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null);
-          return;
-        }
-      } catch (IOException e) {
-        throw new TezUncheckedException("Unable to set " +
-                "the empty partition to succeeded", e);
-      }
-    }
-
-    InputAttemptIdentifier srcAttemptIdentifier =
-        new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(),
-            shufflePayload.getPathComponent());
-
-    URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
-    scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(),
-        partitionId, baseUri.toString(), srcAttemptIdentifier);
-  }
-  
-  private void processTaskFailedEvent(InputFailedEvent ifEvent) {
-    InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
-    scheduler.obsoleteInput(taIdentifier);
-    LOG.info("Obsoleting output of src-task: " + taIdentifier);
-  }
-
-  // TODO NEWTEZ Handle encrypted shuffle
-  @VisibleForTesting
-  URI getBaseURI(String host, int port, int partitionId) {
-    StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
-      partitionId, inputContext.getApplicationId().toString(), sslShuffle);
-    URI u = URI.create(sb.toString());
-    return u;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/tez/blob/196bcca9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
new file mode 100644
index 0000000..6736282
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
+import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * A base class for generic Event handling for Inputs which need to Shuffle data.
+ */
+public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
+
+  private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandlerImpl.class);
+  
+  private final ShuffleManager shuffleManager;
+  private final FetchedInputAllocator inputAllocator;
+  private final CompressionCodec codec;
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  private final boolean useSharedInputs;
+
+  public ShuffleInputEventHandlerImpl(InputContext inputContext,
+                                      ShuffleManager shuffleManager,
+                                      FetchedInputAllocator inputAllocator, CompressionCodec codec,
+                                      boolean ifileReadAhead, int ifileReadAheadLength) {
+    this.shuffleManager = shuffleManager;
+    this.inputAllocator = inputAllocator;
+    this.codec = codec;
+    this.ifileReadAhead = ifileReadAhead;
+    this.ifileReadAheadLength = ifileReadAheadLength;
+    // this currently relies on a user to enable the flag
+    // expand on idea based on vertex parallelism and num inputs
+    this.useSharedInputs = (inputContext.getTaskAttemptNumber() == 0);
+  }
+
+  @Override
+  public void handleEvents(List<Event> events) throws IOException {
+    for (Event event : events) {
+      handleEvent(event);
+    }
+  }
+  
+  private void handleEvent(Event event) throws IOException {
+    if (event instanceof DataMovementEvent) {
+      processDataMovementEvent((DataMovementEvent)event);
+    } else if (event instanceof InputFailedEvent) {
+      processInputFailedEvent((InputFailedEvent)event);
+    } else {
+      throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
+    }
+  }
+
+  private void processDataMovementEvent(DataMovementEvent dme) throws IOException {
+    DataMovementEventPayloadProto shufflePayload;
+    try {
+      shufflePayload = DataMovementEventPayloadProto.parseFrom(
+          ByteString.copyFrom(dme.getUserPayload()));
+    } catch (InvalidProtocolBufferException e) {
+      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+    }
+    int srcIndex = dme.getSourceIndex();
+    String hostIdentifier = shufflePayload.getHost() + ":" + shufflePayload.getPort();
+    LOG.info("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex()
+        + ", attemptNum: " + dme.getVersion() + ", payload: " + ShuffleUtils
+        .stringify(shufflePayload));
+
+    if (shufflePayload.hasEmptyPartitions()) {
+      byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
+          .getEmptyPartitions());
+      BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+      if (emptyPartionsBitSet.get(srcIndex)) {
+        InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(),
+            dme.getVersion());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Source partition: " + srcIndex + " did not generate any data. SrcAttempt: ["
+              + srcAttemptIdentifier + "]. Not fetching.");
+        }
+        shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
+        return;
+      }
+    }
+
+    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(
+        dme.getTargetIndex(), dme.getVersion(),
+        shufflePayload.getPathComponent(), (useSharedInputs && srcIndex == 0));
+
+    if (shufflePayload.hasData()) {
+      DataProto dataProto = shufflePayload.getData();
+      FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(),
+          dataProto.getCompressedLength(), srcAttemptIdentifier);
+      moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier);
+      shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
+    } else {
+      shuffleManager.addKnownInput(shufflePayload.getHost(),
+          shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
+    }
+
+  }
+
+  private void moveDataToFetchedInput(DataProto dataProto,
+      FetchedInput fetchedInput, String hostIdentifier) throws IOException {
+    switch (fetchedInput.getType()) {
+    case DISK:
+      ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
+        hostIdentifier, dataProto.getData().newInput(), dataProto.getCompressedLength(), LOG,
+          fetchedInput.getInputAttemptIdentifier().toString());
+      break;
+    case MEMORY:
+      ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
+        dataProto.getData().newInput(), dataProto.getRawLength(), dataProto.getCompressedLength(),
+        codec, ifileReadAhead, ifileReadAheadLength, LOG,
+        fetchedInput.getInputAttemptIdentifier().toString());
+      break;
+    case WAIT:
+    default:
+      throw new TezUncheckedException("Unexpected type: "
+          + fetchedInput.getType());
+    }
+  }
+  
+  private void processInputFailedEvent(InputFailedEvent ife) {
+    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
+    shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
+  }
+
+}
+


Mime
View raw message