tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2196. Consider reusing UnorderedPartitionedKVWriter with single output in UnorderedKVOutput (rbalamohan)
Date Thu, 26 Mar 2015 07:57:07 GMT
Repository: tez
Updated Branches:
  refs/heads/master 2fe2d6352 -> 1ba1f927c


TEZ-2196. Consider reusing UnorderedPartitionedKVWriter with single output in UnorderedKVOutput (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1ba1f927
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1ba1f927
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1ba1f927

Branch: refs/heads/master
Commit: 1ba1f927c16a1d7c273b6cd1a8553e5269d1541a
Parents: 2fe2d63
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Thu Mar 26 13:25:57 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Thu Mar 26 13:25:57 2015 +0530

----------------------------------------------------------------------
 .../library/api/TezRuntimeConfiguration.java    |  14 --
 .../broadcast/output/FileBasedKVWriter.java     | 190 -------------------
 .../library/common/shuffle/ShuffleUtils.java    |   5 +-
 .../impl/ShuffleInputEventHandlerImpl.java      |  35 +---
 .../common/shuffle/impl/ShuffleManager.java     |  43 -----
 .../writers/UnorderedPartitionedKVWriter.java   |  70 ++++++-
 .../library/output/UnorderedKVOutput.java       | 117 +++++-------
 .../WeightedScalingMemoryDistributor.java       |   8 +-
 .../src/main/proto/ShufflePayloads.proto        |   1 -
 .../TestWeightedScalingMemoryDistributor.java   |   4 +-
 .../TestUnorderedPartitionedKVWriter.java       |  15 +-
 .../output/TestOnFileUnorderedKVOutput.java     | 152 +++++++++++----
 12 files changed, 238 insertions(+), 416 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 565b47a..9f11e4d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -258,18 +258,6 @@ public class TezRuntimeConfiguration {
           "empty.partitions.info-via-events.enabled";
   public static final boolean TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT = true;
 
-  @Private
-  public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED =
-      TEZ_RUNTIME_PREFIX + "transfer.data-via-events.enabled";
-  @Private
-  public static final boolean TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED_DEFAULT = false;
-
-  @Private
-  public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE =
-      TEZ_RUNTIME_PREFIX + "transfer.data-via-events.max-size";
-  @Private
-  public static final int TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT = 200 << 10; // 200KB
-
   /**
    * If the shuffle input is on the local host bypass the http fetch and access the files directly
    */
@@ -369,8 +357,6 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
     tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
     tezRuntimeKeys.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
-    tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
-    tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
     tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
     tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
     tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);

http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
deleted file mode 100644
index 95cea82..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.runtime.library.broadcast.output;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.runtime.api.OutputContext;
-import org.apache.tez.runtime.library.api.KeyValuesWriter;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
-import org.apache.tez.runtime.library.common.sort.impl.IFile;
-import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
-import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
-import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
-
-import com.google.common.base.Preconditions;
-
-@Private
-public class FileBasedKVWriter extends KeyValuesWriter {
-
-  private static final Logger LOG = LoggerFactory.getLogger(FileBasedKVWriter.class);
-  
-  public static final int INDEX_RECORD_LENGTH = 24;
-
-  private final Configuration conf;
-  private int numRecords = 0;
-
-  @SuppressWarnings("rawtypes")
-  private final Class keyClass;
-  @SuppressWarnings("rawtypes")
-  private final Class valClass;
-  private final CompressionCodec codec;
-  private final FileSystem rfs;
-  private final IFile.Writer writer;
-
-  private final Path outputPath;
-  private Path indexPath;
-  
-  private final TezTaskOutput ouputFileManager;
-  private boolean closed = false;
-
-  // Number of output key-value pairs
-  private final TezCounter outputRecordsCounter;
-  // Number of bytes of actual output - uncompressed.
-  private final TezCounter outputBytesCounter;
-  // Size of the data with additional meta-data
-  private final TezCounter outputBytesCounterWithOverhead;
-  // Actual physical size of the data on disk.
-  private final TezCounter outputMaterializedBytesCounter;
-  
-  
-  // TODO NEWTEZ Define Counters
-  // Number of records
-  // Time waiting for a write to complete, if that's possible.
-  // Size of key-value pairs written.
-
-  public FileBasedKVWriter(OutputContext outputContext, Configuration conf) throws IOException {
-    this.conf = conf;
-
-    this.outputRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
-    this.outputBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
-    this.outputBytesCounterWithOverhead = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
-    this.outputMaterializedBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
-
-    this.rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
-
-    // Setup serialization
-    keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
-    valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
-
-    // Setup compression
-    if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
-      Class<? extends CompressionCodec> codecClass = ConfigUtils
-          .getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, this.conf);
-    } else {
-      codec = null;
-    }
-
-    this.ouputFileManager = TezRuntimeUtils.instantiateTaskOutputManager(conf,
-        outputContext);
-    LOG.info("Created KVWriter -> " + "compressionCodec: " + (codec == null ? "NoCompressionCodec"
-        : codec.getClass().getName()));
-
-    this.outputPath = ouputFileManager.getOutputFileForWrite();
-    LOG.info("Writing data file: " + outputPath);
-
-    // TODO NEWTEZ Consider making the buffer size configurable. Also consider
-    // setting up an in-memory buffer which is occasionally flushed to disk so
-    // that the output does not block.
-
-    // TODO NEWTEZ maybe use appropriate counter
-    this.writer = new IFile.Writer(conf, rfs, outputPath, keyClass, valClass,
-        codec, null, outputBytesCounter);
-  }
-
-  /**
-   * @return true if any output was generated. false otherwise
-   * @throws IOException
-   */
-  public boolean close() throws IOException {
-    this.closed = true;
-    this.writer.close();
-    long rawLen = writer.getRawLength();
-    long compLen = writer.getCompressedLength();
-    outputBytesCounterWithOverhead.increment(rawLen);
-    outputMaterializedBytesCounter.increment(compLen);
-    TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
-    TezSpillRecord sr = new TezSpillRecord(1);
-    sr.putIndex(rec, 0);
-
-    this.indexPath = ouputFileManager
-        .getOutputIndexFileForWrite(INDEX_RECORD_LENGTH);
-    LOG.info("Writing index file: " + indexPath);
-    sr.writeToFile(indexPath, conf);
-    return numRecords > 0;
-  }
-
-  @Override
-  public void write(Object key, Object value) throws IOException {
-    this.writer.append(key, value);
-    this.outputRecordsCounter.increment(1);
-    numRecords++;
-  }
-
-  @Override
-  public void write(Object key, Iterable<Object> values) throws IOException {
-    writer.appendKeyValues(key, values.iterator());
-    this.outputRecordsCounter.increment(1);
-    numRecords++;
-  }
-
-  public long getRawLength() {
-    Preconditions.checkState(closed, "Only available after the Writer has been closed");
-    return this.writer.getRawLength();
-  }
-  
-  public long getCompressedLength() {
-    Preconditions.checkState(closed, "Only available after the Writer has been closed");
-    return this.writer.getCompressedLength();
-  }
-
-  public byte[] getData() throws IOException {
-    Preconditions.checkState(closed,
-        "Only available after the Writer has been closed");
-    FSDataInputStream inStream = null;
-    byte[] buf = null;
-    try {
-      inStream = rfs.open(outputPath);
-      buf = new byte[(int) getCompressedLength()];
-      IOUtils.readFully(inStream, buf, 0, (int) getCompressedLength());
-    } finally {
-      if (inStream != null) {
-        inStream.close();
-      }
-    }
-    return buf;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index a9e34f1..9a8b6b5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -262,10 +262,7 @@ public class ShuffleUtils {
     sb.append("host: " + dmProto.getHost()).append(", ");
     sb.append("port: " + dmProto.getPort()).append(", ");
     sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
-    sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
-    if (dmProto.hasData()) {
-      sb.append(", ").append("hasDataInEvent: " + dmProto.hasData());
-    }
+    sb.append("runDuration: " + dmProto.getRunDuration());
     sb.append("]");
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index 1486848..61b3e3a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -104,7 +104,6 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
       throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
     }
     int srcIndex = dme.getSourceIndex();
-    String hostIdentifier = shufflePayload.getHost() + ":" + shufflePayload.getPort();
     LOG.info("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex()
         + ", attemptNum: " + dme.getVersion() + ", payload: " + ShuffleUtils
         .stringify(shufflePayload));
@@ -128,40 +127,10 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
     InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme,
         shufflePayload, (useSharedInputs && srcIndex == 0));
 
-    if (shufflePayload.hasData()) {
-      DataProto dataProto = shufflePayload.getData();
-      FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(),
-          dataProto.getCompressedLength(), srcAttemptIdentifier);
-      moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier);
-      shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
-    } else {
-      shuffleManager.addKnownInput(shufflePayload.getHost(),
-          shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
-    }
-
+    shuffleManager.addKnownInput(shufflePayload.getHost(),
+        shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
   }
 
-  private void moveDataToFetchedInput(DataProto dataProto,
-      FetchedInput fetchedInput, String hostIdentifier) throws IOException {
-    switch (fetchedInput.getType()) {
-    case DISK:
-      ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
-        hostIdentifier, dataProto.getData().newInput(), dataProto.getCompressedLength(), LOG,
-          fetchedInput.getInputAttemptIdentifier().toString());
-      break;
-    case MEMORY:
-      ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
-        dataProto.getData().newInput(), dataProto.getRawLength(), dataProto.getCompressedLength(),
-        codec, ifileReadAhead, ifileReadAheadLength, LOG,
-        fetchedInput.getInputAttemptIdentifier().toString());
-      break;
-    case WAIT:
-    default:
-      throw new TezUncheckedException("Unexpected type: "
-          + fetchedInput.getType());
-    }
-  }
-  
   private void processInputFailedEvent(InputFailedEvent ife) {
     InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
     shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);

http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 9c53414..749143a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -508,49 +508,6 @@ public class ShuffleManager implements FetcherCallback {
     lastEventReceived.setValue(relativeTime);
   }
 
-  public void addCompletedInputWithData(
-      InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput)
-      throws IOException {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-
-    LOG.info("Received Data via Event: " + srcAttemptIdentifier + " to "
-        + fetchedInput.getType());
-    // Count irrespective of whether this is a copy of an already fetched input
-    lock.lock();
-    try {
-      lastProgressTime = System.currentTimeMillis();
-    } finally {
-      lock.unlock();
-    }
-
-    boolean committed = false;
-    if (!completedInputSet.contains(inputIdentifier)) {
-      synchronized (completedInputSet) {
-        if (!completedInputSet.contains(inputIdentifier)) {
-          fetchedInput.commit();
-          committed = true;
-          if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
-            registerCompletedInput(fetchedInput);
-          } else {
-            registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
-          }
-        }
-      }
-    }
-    if (!committed) {
-      fetchedInput.abort(); // If this fails, the fetcher may attempt another
-                            // abort.
-    } else {
-      lock.lock();
-      try {
-        // Signal the wakeLoop to check for termination.
-        wakeLoop.signal();
-      } finally {
-        lock.unlock();
-      }
-    }
-  }
-
   public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
     obsoletedInputs.add(srcAttemptIdentifier);
     // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.

http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 19f6130..7aac6f9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -133,6 +133,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   @VisibleForTesting
   Path finalOutPath;
 
+  //for single partition cases (e.g UnorderedKVOutput)
+  private final IFile.Writer writer;
+  private final boolean skipBuffers;
+
   private final ReentrantLock spillLock = new ReentrantLock();
   private final Condition spillInProgress = spillLock.newCondition();
 
@@ -143,7 +147,22 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf,
       int numOutputs, long availableMemoryBytes) throws IOException {
     super(outputContext, conf, numOutputs);
-    Preconditions.checkArgument(availableMemoryBytes > 0, "availableMemory should not be > 0 bytes");
+    Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes");
+
+    //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in
+    // this case.  Add it later if needed.
+    pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
+
+    if (availableMemoryBytes == 0) {
+      Preconditions.checkArgument(((numPartitions == 1) && !pipelinedShuffle), "availableMemory "
+          + "can be set to 0 only when numPartitions=1 and " + TezRuntimeConfiguration
+          .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " is disabled. current numPartitions=" +
+          numPartitions + ", " + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + "="
+          + pipelinedShuffle);
+    }
+
     // Ideally, should be significantly larger.
     availableMemory = availableMemoryBytes;
 
@@ -179,15 +198,22 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     outputLargeRecordsCounter = outputContext.getCounters().findCounter(
         TaskCounter.OUTPUT_LARGE_RECORDS);
 
-    //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in
-    // this case.  Add it later if needed.
-    pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
-        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
-        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
 
 
     indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
-    LOG.info("pipelinedShuffle=" + pipelinedShuffle);
+
+    if (numPartitions == 1 && !pipelinedShuffle) {
+      //special case, where in only one partition is available.
+      finalOutPath = outputFileHandler.getOutputFileForWrite();
+      finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+      skipBuffers = true;
+      writer = new IFile.Writer(conf, rfs, finalOutPath, keyClass, valClass,
+          codec, outputRecordsCounter, outputRecordBytesCounter);
+    } else {
+      skipBuffers = false;
+      writer = null;
+    }
+    LOG.info("pipelinedShuffle=" + pipelinedShuffle + ", skipBuffers=" + skipBuffers);
   }
 
   private void computeNumBuffersAndSize(int bufferLimit) {
@@ -210,8 +236,13 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       // Already reported as a fatalError - report to the user code
       throw new IOException("Exception during spill", new IOException(spillException));
     }
-    int partition = partitioner.getPartition(key, value, numPartitions);
-    write(key, value, partition);
+    if (skipBuffers) {
+      //special case, where we have only one partition and pipeliing is disabled.
+      writer.append(key, value);
+    } else {
+      int partition = partitioner.getPartition(key, value, numPartitions);
+      write(key, value, partition);
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -451,6 +482,27 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
       List<Event> events = Lists.newLinkedList();
       if (!pipelinedShuffle) {
+        if (skipBuffers) {
+          writer.close();
+          long rawLen = writer.getRawLength();
+          long compLen = writer.getCompressedLength();
+          TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
+          TezSpillRecord sr = new TezSpillRecord(1);
+          sr.putIndex(rec, 0);
+          sr.writeToFile(finalIndexPath, conf);
+
+          BitSet emptyPartitions = new BitSet();
+          if (outputRecordsCounter.getValue() == 0) {
+            emptyPartitions.set(0);
+          }
+          cleanupCurrentBuffer();
+
+          outputBytesWithOverheadCounter.increment(rawLen);
+          fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate);
+          return Collections.singletonList(generateDMEvent(false, -1, false, outputContext
+              .getUniqueIdentifier(), emptyPartitions));
+        }
+
         //Regular code path.
         if (numSpills.get() > 0) {
           mergeAll();

http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index f455ffb..9914735 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -18,12 +18,11 @@
 
 package org.apache.tez.runtime.library.output;
 
-import java.nio.ByteBuffer;
-import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,26 +31,21 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
-import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.OutputContext;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.library.api.KeyValuesWriter;
+import org.apache.tez.runtime.library.api.Partitioner;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.broadcast.output.FileBasedKVWriter;
-import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
 
 /**
  * {@link UnorderedKVOutput} is a {@link LogicalOutput} that writes key
@@ -63,12 +57,13 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
 
   private static final Logger LOG = LoggerFactory.getLogger(UnorderedKVOutput.class);
 
-  private FileBasedKVWriter kvWriter;
+  @VisibleForTesting
+  UnorderedPartitionedKVWriter kvWriter;
   
   private Configuration conf;
   
-  private boolean dataViaEventsEnabled;
-  private int dataViaEventsMaxSize;
+  private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
   public UnorderedKVOutput(OutputContext outputContext, int numPhysicalOutputs) {
     super(outputContext, numPhysicalOutputs);
@@ -82,25 +77,34 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
     this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
         getContext().getWorkDirs());
 
-    getContext().requestInitialMemory(0l, null); // mandatory call
+    this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, CustomPartitioner.class
+        .getName());
 
-    this.dataViaEventsEnabled = conf.getBoolean(
-        TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED,
-        TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED_DEFAULT);
-    this.dataViaEventsMaxSize = conf.getInt(
-        TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE,
-        TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT);
-    
-    LOG.info(this.getClass().getSimpleName() + " running with params -> "
-        + "dataViaEventsEnabled: " + dataViaEventsEnabled
-        + ", dataViaEventsMaxSize: " + dataViaEventsMaxSize);
+    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+
+    boolean pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
+
+    long memRequestSize = (pipelinedShuffle) ?
+        UnorderedPartitionedKVWriter.getInitialMemoryRequirement(conf, getContext()
+            .getTotalMemoryAvailableToTask()) : 0;
+    getContext().requestInitialMemory(memRequestSize, memoryUpdateCallbackHandler);
     
-    this.kvWriter = new FileBasedKVWriter(getContext(), conf);
     return Collections.emptyList();
   }
 
   @Override
-  public synchronized void start() {
+  public synchronized void start() throws Exception {
+    if (!isStarted.get()) {
+      memoryUpdateCallbackHandler.validateUpdateReceived();
+      //This would have just a single partition
+      this.kvWriter = new UnorderedPartitionedKVWriter(getContext(), conf, 1,
+          memoryUpdateCallbackHandler.getMemoryAssigned());
+      isStarted.set(true);
+      LOG.info(this.getClass().getSimpleName() + " started. MemoryAssigned="
+          + memoryUpdateCallbackHandler.getMemoryAssigned());
+    }
   }
 
   @Override
@@ -116,49 +120,12 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
 
   @Override
   public synchronized List<Event> close() throws Exception {
-    boolean outputGenerated = this.kvWriter.close();
-    
-    DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
-        .newBuilder();
-    
-    LOG.info("Closing KVOutput: RawLength: " + this.kvWriter.getRawLength()
-        + ", CompressedLength: " + this.kvWriter.getCompressedLength());
-
-    if (dataViaEventsEnabled && outputGenerated && this.kvWriter.getCompressedLength() <= dataViaEventsMaxSize) {
-      LOG.info("Serialzing actual data into DataMovementEvent, dataSize: " + this.kvWriter.getCompressedLength());
-      byte[] data = this.kvWriter.getData();
-      DataProto.Builder dataProtoBuilder = DataProto.newBuilder();
-      dataProtoBuilder.setData(ByteString.copyFrom(data));
-      dataProtoBuilder.setRawLength((int)this.kvWriter.getRawLength());
-      dataProtoBuilder.setCompressedLength((int)this.kvWriter.getCompressedLength());
-      payloadBuilder.setData(dataProtoBuilder.build());
-    }
-
-    // Set the list of empty partitions - single partition on this case.
-    if (!outputGenerated) {
-      LOG.info("No output was generated");
-      BitSet emptyPartitions = new BitSet();
-      emptyPartitions.set(0);
-      ByteString emptyPartitionsBytesString =
-          TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions));
-      payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
-    }
-    if (outputGenerated) {
-      String host = getHost();
-      ByteBuffer shuffleMetadata = getContext()
-          .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
-      int shufflePort = ShuffleUtils
-          .deserializeShuffleProviderMetaData(shuffleMetadata);
-      payloadBuilder.setHost(host);
-      payloadBuilder.setPort(shufflePort);
-      payloadBuilder.setPathComponent(getContext().getUniqueIdentifier());
+    if (isStarted.get()) {
+      //TODO: Do we need to support sending payloads via events?
+      return kvWriter.close();
+    } else {
+      return Collections.emptyList();
     }
-    DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
-
-    DataMovementEvent dmEvent = DataMovementEvent.create(0, payloadProto.toByteString().asReadOnlyByteBuffer());
-    List<Event> events = Lists.newArrayListWithCapacity(1);
-    events.add(dmEvent);
-    return events;
   }
 
   @VisibleForTesting
@@ -173,12 +140,15 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
@@ -192,4 +162,13 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
   public static Set<String> getConfigurationKeySet() {
     return Collections.unmodifiableSet(confKeys);
   }
+
+  @Private
+  public static class CustomPartitioner implements Partitioner {
+
+    @Override
+    public int getPartition(Object key, Object value, int numPartitions) {
+      return 0;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
index a0f834e..b70c9d7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
@@ -64,7 +64,7 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
   static final double MAX_ADDITIONAL_RESERVATION_FRACTION_PER_IO = 0.1d;
   static final double RESERVATION_FRACTION_PER_IO = 0.015d;
   static final String[] DEFAULT_TASK_MEMORY_WEIGHTED_RATIOS =
-      generateWeightStrings(1, 1, 12, 12, 1, 1);
+      generateWeightStrings(1, 1, 1, 12, 12, 1, 1);
 
   private Configuration conf;
 
@@ -265,11 +265,11 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
     return reserveFraction;
   }
 
-  public static String[] generateWeightStrings(int unsortedPartitioned, int broadcastIn,
-      int sortedOut, int scatterGatherShuffleIn, int proc, int other) {
+  public static String[] generateWeightStrings(int unsortedPartitioned, int unsorted,
+      int broadcastIn, int sortedOut, int scatterGatherShuffleIn, int proc, int other) {
     String[] weights = new String[RequestType.values().length];
     weights[0] = RequestType.PARTITIONED_UNSORTED_OUTPUT.name() + ":" + unsortedPartitioned;
-    weights[1] = RequestType.UNSORTED_OUTPUT.name() + ":" + 0;
+    weights[1] = RequestType.UNSORTED_OUTPUT.name() + ":" + unsorted;
     weights[2] = RequestType.UNSORTED_INPUT.name() + ":" + broadcastIn;
     weights[3] = RequestType.SORTED_OUTPUT.name() + ":" + sortedOut;
     weights[4] = RequestType.SORTED_MERGED_INPUT.name() + ":" + scatterGatherShuffleIn;

http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index 6192381..f7b482d 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -26,7 +26,6 @@ message DataMovementEventPayloadProto {
   optional int32 port = 3;
   optional string path_component = 4;
   optional int32 run_duration = 5;
-  optional DataProto data = 6;
   optional bool pipelined = 7; // Related to pipelined shuffle
   optional bool last_event = 8; // Related to pipelined shuffle
   optional int32 spill_id = 9; //  Related to pipelined shuffle.

http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
index 04d3594..b34accd 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
@@ -54,7 +54,7 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor
   public void testSimpleWeightedScaling() {
     Configuration conf = new Configuration(this.conf);
     conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
-        WeightedScalingMemoryDistributor.generateWeightStrings(0, 1, 2, 3, 1, 1));
+        WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 1, 2, 3, 1, 1));
     System.err.println(Joiner.on(",").join(conf.getStringCollection(
         TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS)));
 
@@ -101,7 +101,7 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor
   public void testAdditionalReserveFractionWeightedScaling() {
     Configuration conf = new Configuration(this.conf);
     conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
-        WeightedScalingMemoryDistributor.generateWeightStrings(0, 2, 3, 6, 1, 1));
+        WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 2, 3, 6, 1, 1));
     conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO, 0.025d);
     conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX, 0.2d);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 9b1318c..1a10eb8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -197,6 +197,12 @@ public class TestUnorderedPartitionedKVWriter {
   }
 
   @Test(timeout = 10000)
+  public void testNoSpill_SinglPartition() throws IOException, InterruptedException {
+    baseTest(10, 1, null, shouldCompress);
+  }
+
+
+  @Test(timeout = 10000)
   public void testRandomText() throws IOException, InterruptedException {
     textTest(100, 10, 2048, 0, 0, 0, false);
   }
@@ -339,7 +345,6 @@ public class TestUnorderedPartitionedKVWriter {
     DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(
         ByteString.copyFrom(cdme
             .getUserPayload()));
-    assertFalse(eventProto.hasData());
     BitSet emptyPartitionBits = null;
     if (partitionsWithData.cardinality() != numPartitions) {
       assertTrue(eventProto.hasEmptyPartitions());
@@ -549,7 +554,6 @@ public class TestUnorderedPartitionedKVWriter {
     assertEquals(numOutputs, cdme.getCount());
     DataMovementEventPayloadProto eventProto =
         DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload()));
-    assertFalse(eventProto.hasData());
     //Ensure that this is the last event
     assertTrue(eventProto.getLastEvent());
     if (eventProto.hasEmptyPartitions()) {
@@ -671,9 +675,11 @@ public class TestUnorderedPartitionedKVWriter {
     TezCounter numAdditionalSpillsCounter = counters
         .findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
     assertEquals(numRecordsWritten * sizePerRecord, outputRecordBytesCounter.getValue());
+    if (numPartitions > 1) {
+      assertEquals(numRecordsWritten * sizePerRecordWithOverhead, outputBytesWithOverheadCounter.getValue());
+    }
     assertEquals(numRecordsWritten, outputRecordsCounter.getValue());
-    assertEquals(numRecordsWritten * sizePerRecordWithOverhead,
-        outputBytesWithOverheadCounter.getValue());
+
     long fileOutputBytes = fileOutputBytesCounter.getValue();
     if (numRecordsWritten > 0) {
       assertTrue(fileOutputBytes > 0);
@@ -710,7 +716,6 @@ public class TestUnorderedPartitionedKVWriter {
     DataMovementEventPayloadProto eventProto =
         DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(
             cdme.getUserPayload()));
-    assertFalse(eventProto.hasData());
     if (skippedPartitions == null && numRecordsWritten > 0) {
       assertFalse(eventProto.hasEmptyPartitions());
       emptyPartitionBits = new BitSet(numPartitions);

http://git-wip-us.apache.org/repos/asf/tez/blob/1ba1f927/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 4d9001d..509c23d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -18,7 +18,13 @@
 
 package org.apache.tez.runtime.library.output;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -31,6 +37,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.protobuf.ByteString;
+import org.apache.commons.lang.RandomStringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -49,14 +56,16 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputContext;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.runtime.library.testutils.KVDataGen;
@@ -64,6 +73,9 @@ import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestOnFileUnorderedKVOutput {
 
@@ -72,6 +84,7 @@ public class TestOnFileUnorderedKVOutput {
   private static Configuration defaultConf = new Configuration();
   private static FileSystem localFs = null;
   private static Path workDir = null;
+  private static final int shufflePort = 2112;
 
   static {
     defaultConf.set("fs.defaultFS", "file:///");
@@ -87,6 +100,10 @@ public class TestOnFileUnorderedKVOutput {
   }
 
   @Before
+  public void setup() throws Exception {
+    localFs.mkdirs(workDir);
+  }
+
   @After
   public void cleanup() throws Exception {
     localFs.delete(workDir, true);
@@ -98,41 +115,13 @@ public class TestOnFileUnorderedKVOutput {
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
 
-    int appAttemptNumber = 1;
-    TezUmbilical tezUmbilical = null;
-    String dagName = "currentDAG";
-    String taskVertexName = "currentVertex";
-    String destinationVertexName = "destinationVertex";
-    TezDAGID dagID = TezDAGID.getInstance("2000", 1, 1);
-    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
-    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
-    TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
-    TezCounters counters = new TezCounters();
-    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-    RuntimeTask runtimeTask = mock(RuntimeTask.class);
-    
-    int shufflePort = 2112;
-    Map<String, String> auxEnv = new HashMap<String, String>();
-    ByteBuffer bb = ByteBuffer.allocate(4);
-    bb.putInt(shufflePort);
-    bb.position(0);
-    AuxiliaryServiceHelper.setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, bb, auxEnv);
-
-
-    OutputDescriptor outputDescriptor = mock(OutputDescriptor.class);
-    when(outputDescriptor.getClassName()).thenReturn("OutputDescriptor");
-
-    OutputContext outputContext = new TezOutputContextImpl(conf, new String[] {workDir.toString()},
-        appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName,
-        -1, taskAttemptID, counters, 0, userPayload, runtimeTask,
-        null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null,
-        new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
+    OutputContext outputContext = createOutputContext(conf);
 
-    UnorderedKVOutput kvOutput = new OnFileUnorderedKVOutputForTest(outputContext, 1);
+    UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1);
 
     List<Event> events = null;
-
     events = kvOutput.initialize();
+    kvOutput.start();
     assertTrue(events != null && events.size() == 0);
 
     KeyValueWriter kvWriter = kvOutput.getWriter();
@@ -143,9 +132,9 @@ public class TestOnFileUnorderedKVOutput {
 
     events = kvOutput.close();
     assertTrue(events != null && events.size() == 1);
-    DataMovementEvent dmEvent = (DataMovementEvent)events.get(0);
+    CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0);
 
-    assertEquals("Invalid source index", 0, dmEvent.getSourceIndex());
+    assertEquals("Invalid source index", 0, dmEvent.getSourceIndexStart());
 
     DataMovementEventPayloadProto shufflePayload = DataMovementEventPayloadProto
         .parseFrom(ByteString.copyFrom(dmEvent.getUserPayload()));
@@ -153,18 +142,97 @@ public class TestOnFileUnorderedKVOutput {
     assertFalse(shufflePayload.hasEmptyPartitions());
     assertEquals(outputContext.getUniqueIdentifier(), shufflePayload.getPathComponent());
     assertEquals(shufflePort, shufflePayload.getPort());
-    assertEquals("host", shufflePayload.getHost());
+    assertEquals("localhost", shufflePayload.getHost());
   }
 
-  private static class OnFileUnorderedKVOutputForTest extends UnorderedKVOutput {
+  @Test(timeout = 30000)
+  @SuppressWarnings("unchecked")
+  public void testWithPipelinedShuffle() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
+
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 1);
 
-    public OnFileUnorderedKVOutputForTest(OutputContext outputContext, int numPhysicalOutputs) {
-      super(outputContext, numPhysicalOutputs);
-    }
+    OutputContext outputContext = createOutputContext(conf);
+
+    UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1);
+
+    List<Event> events = null;
+    events = kvOutput.initialize();
+    kvOutput.start();
+    assertTrue(events != null && events.size() == 0);
 
-    @Override
-    String getHost() {
-      return "host";
+    KeyValueWriter kvWriter = kvOutput.getWriter();
+    for (int i = 0; i < 500; i++) {
+      kvWriter.write(new Text(RandomStringUtils.randomAscii(10000)), new IntWritable(i));
     }
+
+    events = kvOutput.close();
+    //When pipelining is on, all events are sent out within writer itself.
+    assertTrue(events != null && events.size() == 0);
+
+    //ensure that data is sent via outputContext.
+    ArgumentCaptor<List> eventsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(outputContext, atLeast(1)).sendEvents(eventsCaptor.capture());
+    events = eventsCaptor.getValue();
+
+
+    CompositeDataMovementEvent dmEvent = (CompositeDataMovementEvent)events.get(0);
+    assertEquals("Invalid source index", 0, dmEvent.getSourceIndexStart());
+
+    DataMovementEventPayloadProto shufflePayload = DataMovementEventPayloadProto
+        .parseFrom(ByteString.copyFrom(dmEvent.getUserPayload()));
+
+    assertTrue(shufflePayload.hasLastEvent());
+
+    assertFalse(shufflePayload.hasEmptyPartitions());
+    assertEquals(shufflePort, shufflePayload.getPort());
+    assertEquals("localhost", shufflePayload.getHost());
+  }
+
+  private OutputContext createOutputContext(Configuration conf) throws IOException {
+    int appAttemptNumber = 1;
+    TezUmbilical tezUmbilical = mock(TezUmbilical.class);
+    String dagName = "currentDAG";
+    String taskVertexName = "currentVertex";
+    String destinationVertexName = "destinationVertex";
+    TezDAGID dagID = TezDAGID.getInstance("2000", 1, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
+    TezCounters counters = new TezCounters();
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+    RuntimeTask runtimeTask = mock(RuntimeTask.class);
+
+
+    Map<String, String> auxEnv = new HashMap<String, String>();
+    ByteBuffer bb = ByteBuffer.allocate(4);
+    bb.putInt(shufflePort);
+    bb.position(0);
+    AuxiliaryServiceHelper.setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, bb, auxEnv);
+
+
+    OutputDescriptor outputDescriptor = mock(OutputDescriptor.class);
+    when(outputDescriptor.getClassName()).thenReturn("OutputDescriptor");
+
+    OutputContext realOutputContext = new TezOutputContextImpl(conf, new String[] {workDir.toString()},
+        appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName,
+        -1, taskAttemptID, counters, 0, userPayload, runtimeTask,
+        null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null,
+        new ExecutionContextImpl("localhost"), 2048);
+    OutputContext outputContext = spy(realOutputContext);
+    doAnswer(new Answer() {
+      @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+        long requestedSize = (Long) invocation.getArguments()[0];
+        MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler) invocation
+            .getArguments()[1];
+        callback.memoryAssigned(requestedSize);
+        return null;
+      }
+    }).when(outputContext).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class));
+
+    return outputContext;
   }
 }


Mime
View raw message