tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [14/29] tez git commit: TEZ-2001. Support pipelined data transfer for ordered output (rbalamohan)
Date Wed, 04 Mar 2015 20:52:00 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/c85b724d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
new file mode 100644
index 0000000..f68032f
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -0,0 +1,256 @@
+package org.apache.tez.runtime.library.common.shuffle;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Random;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class TestShuffleUtils {
+
+  private static final String HOST = "localhost";
+  private static final int PORT = 8080;
+  private static final String PATH_COMPONENT = "attempt";
+
+  private OutputContext outputContext;
+  private Configuration conf;
+  private FileSystem localFs;
+  private Path workingDir;
+
+  private InputContext createTezInputContext() {
+    ApplicationId applicationId = ApplicationId.newInstance(1, 1);
+    InputContext inputContext = mock(InputContext.class);
+    doReturn(applicationId).when(inputContext).getApplicationId();
+    doReturn("sourceVertex").when(inputContext).getSourceVertexName();
+    when(inputContext.getCounters()).thenReturn(new TezCounters());
+    return inputContext;
+  }
+
+  private OutputContext createTezOutputContext() throws IOException {
+    ApplicationId applicationId = ApplicationId.newInstance(1, 1);
+    OutputContext outputContext = mock(OutputContext.class);
+
+    ExecutionContextImpl executionContext = mock(ExecutionContextImpl.class);
+    doReturn("localhost").when(executionContext).getHostName();
+    doReturn(executionContext).when(outputContext).getExecutionContext();
+
+    DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
+    serviceProviderMetaData.writeInt(80);
+    doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext)
+        .getServiceProviderMetaData
+            (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+
+
+    doReturn(1).when(outputContext).getTaskVertexIndex();
+    doReturn(1).when(outputContext).getOutputIndex();
+    doReturn(0).when(outputContext).getDAGAttemptNumber();
+    doReturn("destVertex").when(outputContext).getDestinationVertexName();
+
+    when(outputContext.getCounters()).thenReturn(new TezCounters());
+    return outputContext;
+  }
+
+
+  @Before
+  public void setup() throws Exception {
+    outputContext = createTezOutputContext();
+    conf = new Configuration();
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 1); // DefaultSorter
+    conf.set("fs.defaultFS", "file:///");
+    localFs = FileSystem.getLocal(conf);
+
+    workingDir = new Path(
+        new Path(System.getProperty("test.build.data", "/tmp")),
+        TestShuffleUtils.class.getName())
+        .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+    String localDirs = workingDir.toString();
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+        HashPartitioner.class.getName());
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+  }
+
+  private Path createIndexFile(int numPartitions, boolean allEmptyPartitions) throws IOException {
+    Path path = new Path(workingDir, "file.index.out");
+    TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
+    long startOffset = 0;
+    long partLen = 200; //compressed
+    Random rnd = new Random();
+    for(int i=0;i<numPartitions;i++) {
+      long rawLen = rnd.nextLong();
+      if (i % 2  == 0 || allEmptyPartitions) {
+        rawLen = 6; //indicates empty partition
+      }
+      TezIndexRecord indexRecord = new TezIndexRecord(startOffset, rawLen, partLen);
+      startOffset += partLen;
+      spillRecord.putIndex(indexRecord, i);
+    }
+    spillRecord.writeToFile(path, conf);
+    return path;
+  }
+
+  @Test
+  public void testGenerateOnSpillEvent() throws Exception {
+    List<Event> events = Lists.newLinkedList();
+    Path indexFile = createIndexFile(10, false);
+
+    boolean finalMergeEnabled = false;
+    boolean isLastEvent = false;
+    int spillId = 0;
+    int physicalOutputs = 10;
+    String pathComponent = "/attempt_x_y_0/file.out";
+    ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext,
+        spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent);
+
+    Assert.assertTrue(events.size() == 1);
+    Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
+
+    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0);
+    Assert.assertTrue(cdme.getCount() == physicalOutputs);
+    Assert.assertTrue(cdme.getSourceIndexStart() == 0);
+
+    ByteBuffer payload = cdme.getUserPayload();
+    ShuffleUserPayloads.DataMovementEventPayloadProto dmeProto =
+        ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(payload));
+
+    Assert.assertTrue(dmeProto.getSpillId() == 0);
+    Assert.assertTrue(dmeProto.hasLastEvent() && !dmeProto.getLastEvent());
+
+    byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(dmeProto.getEmptyPartitions());
+    BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+    Assert.assertTrue("emptyPartitionBitSet cardinality (expecting 5) = " + emptyPartitionsBitSet
+        .cardinality(), emptyPartitionsBitSet.cardinality() == 5);
+
+    events.clear();
+
+  }
+
+  @Test
+  public void testGenerateOnSpillEvent_With_FinalMerge() throws Exception {
+    List<Event> events = Lists.newLinkedList();
+    Path indexFile = createIndexFile(10, false);
+
+    boolean finalMergeEnabled = true;
+    boolean isLastEvent = true;
+    int spillId = 0;
+    int physicalOutputs = 10;
+    String pathComponent = "/attempt_x_y_0/file.out";
+
+    //normal code path where we do final merge all the time
+    ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext,
+        spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent);
+
+    Assert.assertTrue(events.size() == 2); //one for VM
+    Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
+    Assert.assertTrue(events.get(1) instanceof CompositeDataMovementEvent);
+
+    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(1);
+    Assert.assertTrue(cdme.getCount() == physicalOutputs);
+    Assert.assertTrue(cdme.getSourceIndexStart() == 0);
+
+    ShuffleUserPayloads.DataMovementEventPayloadProto dmeProto =
+        ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom( cdme.getUserPayload()));
+
+    //With final merge, spill details should not be present
+    Assert.assertFalse(dmeProto.hasSpillId());
+    Assert.assertFalse(dmeProto.hasLastEvent() || dmeProto.getLastEvent());
+
+    byte[]  emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(dmeProto
+        .getEmptyPartitions());
+    BitSet  emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+    Assert.assertTrue("emptyPartitionBitSet cardinality (expecting 5) = " + emptyPartitionsBitSet
+        .cardinality(), emptyPartitionsBitSet.cardinality() == 5);
+
+  }
+
+  @Test
+  public void testGenerateOnSpillEvent_With_All_EmptyPartitions() throws Exception {
+    List<Event> events = Lists.newLinkedList();
+
+    //Create an index file with all empty partitions
+    Path indexFile = createIndexFile(10, true);
+
+    boolean finalMergeDisabled = false;
+    boolean isLastEvent = true;
+    int spillId = 0;
+    int physicalOutputs = 10;
+    String pathComponent = "/attempt_x_y_0/file.out";
+
+    //normal code path where we do final merge all the time
+    ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent, outputContext,
+        spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent);
+
+    Assert.assertTrue(events.size() == 2); //one for VM
+    Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
+    Assert.assertTrue(events.get(1) instanceof CompositeDataMovementEvent);
+
+    CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(1);
+    Assert.assertTrue(cdme.getCount() == physicalOutputs);
+    Assert.assertTrue(cdme.getSourceIndexStart() == 0);
+
+    ShuffleUserPayloads.DataMovementEventPayloadProto dmeProto =
+        ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom( cdme.getUserPayload()));
+
+    //spill details should be present
+    Assert.assertTrue(dmeProto.getSpillId() == 0);
+    Assert.assertTrue(dmeProto.hasLastEvent() && dmeProto.getLastEvent());
+
+    Assert.assertTrue(dmeProto.getPathComponent().equals(""));
+
+    byte[]  emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(dmeProto
+        .getEmptyPartitions());
+    BitSet  emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
+    Assert.assertTrue("emptyPartitionBitSet cardinality (expecting 10) = " + emptyPartitionsBitSet
+        .cardinality(), emptyPartitionsBitSet.cardinality() == 10);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/c85b724d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandler.java
deleted file mode 100644
index b3334b4..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandler.java
+++ /dev/null
@@ -1,172 +0,0 @@
-package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
-
-import com.google.protobuf.ByteString;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-public class TestShuffleInputEventHandler {
-
-  private static final String HOST = "localhost";
-  private static final int PORT = 8080;
-  private static final String PATH_COMPONENT = "attempt";
-
-  private ShuffleInputEventHandlerOrderedGrouped handler;
-  private ShuffleScheduler scheduler;
-
-  private InputContext createTezInputContext() {
-    ApplicationId applicationId = ApplicationId.newInstance(1, 1);
-    InputContext inputContext = mock(InputContext.class);
-    doReturn(applicationId).when(inputContext).getApplicationId();
-    return inputContext;
-  }
-
-  private Event createDataMovementEvent(int srcIndex, int targetIndex,
-      ByteString emptyPartitionByteString, boolean allPartitionsEmpty) {
-    ShuffleUserPayloads.DataMovementEventPayloadProto.Builder builder =
-        ShuffleUserPayloads.DataMovementEventPayloadProto
-            .newBuilder();
-    if (!allPartitionsEmpty) {
-      builder.setHost(HOST);
-      builder.setPort(PORT);
-      builder.setPathComponent(PATH_COMPONENT);
-    }
-    builder.setRunDuration(10);
-    if (emptyPartitionByteString != null) {
-      builder.setEmptyPartitions(emptyPartitionByteString);
-    }
-    return DataMovementEvent
-        .create(srcIndex, targetIndex, 0, builder.build().toByteString().asReadOnlyByteBuffer());
-  }
-
-  @Before
-  public void setup() throws Exception {
-    InputContext inputContext = createTezInputContext();
-    scheduler = mock(ShuffleScheduler.class);
-    handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, false);
-  }
-
-  @Test(timeout = 5000)
-  public void basicTest() throws IOException {
-    List<Event> events = new LinkedList<Event>();
-    int srcIdx = 0;
-    int targetIdx = 1;
-    Event dme = createDataMovementEvent(srcIdx, targetIdx, null, false);
-    events.add(dme);
-    handler.handleEvents(events);
-    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0,
-        PATH_COMPONENT);
-
-    String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
-    int partitionId = srcIdx;
-    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId),
-        eq(baseUri), eq(expectedIdentifier));
-  }
-
-  @Test(timeout = 5000)
-  public void testFailedEvent() throws IOException {
-    List<Event> events = new LinkedList<Event>();
-    int targetIdx = 1;
-    InputFailedEvent failedEvent = InputFailedEvent.create(targetIdx, 0);
-    events.add(failedEvent);
-    handler.handleEvents(events);
-    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
-    verify(scheduler).obsoleteInput(eq(expectedIdentifier));
-  }
-
-  @Test(timeout = 5000)
-  public void testAllPartitionsEmpty() throws IOException {
-    List<Event> events = new LinkedList<Event>();
-    int srcIdx = 0;
-    int targetIdx = 1;
-    Event dme = createDataMovementEvent(srcIdx, targetIdx, createEmptyPartitionByteString(srcIdx)
-        , true);
-    events.add(dme);
-    handler.handleEvents(events);
-
-    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
-    verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l),
-        eq(0l), eq(0l), any(MapOutput.class));
-  }
-
-  @Test(timeout = 5000)
-  public void testCurrentPartitionEmpty() throws IOException {
-    List<Event> events = new LinkedList<Event>();
-    int srcIdx = 0;
-    int targetIdx = 1;
-    Event dme = createDataMovementEvent(srcIdx, targetIdx, createEmptyPartitionByteString(srcIdx)
-        , false);
-    events.add(dme);
-    handler.handleEvents(events);
-
-    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
-
-    verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l),
-        eq(0l), eq(0l), any(MapOutput.class));
-  }
-
-  @Test(timeout = 5000)
-  public void testOtherPartitionEmpty() throws IOException {
-    List<Event> events = new LinkedList<Event>();
-    int srcIdx = 0;
-    int taskIndex = 1;
-    Event dme = createDataMovementEvent(srcIdx, taskIndex, createEmptyPartitionByteString(100),
-        false);
-    events.add(dme);
-    handler.handleEvents(events);
-
-    String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
-    int partitionId = srcIdx;
-    InputAttemptIdentifier expectedIdentifier =
-        new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT);
-
-    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri),
-        eq(expectedIdentifier));
-  }
-
-  private ByteString createEmptyPartitionByteString(int... emptyPartitions) throws IOException {
-    BitSet bitSet = new BitSet();
-    for (int i : emptyPartitions) {
-      bitSet.set(i);
-    }
-    return TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(bitSet));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/c85b724d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
new file mode 100644
index 0000000..700d9a5
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -0,0 +1,295 @@
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputIdentifier;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class TestShuffleInputEventHandlerOrderedGrouped {
+  private static final String HOST = "localhost";
+  private static final int PORT = 8080;
+  private static final String PATH_COMPONENT = "attempt";
+  private ShuffleInputEventHandlerOrderedGrouped handler;
+  private ShuffleScheduler scheduler;
+  private ShuffleScheduler realScheduler;
+  private MergeManager mergeManager;
+
+  private InputContext createTezInputContext() {
+    ApplicationId applicationId = ApplicationId.newInstance(1, 1);
+    InputContext inputContext = mock(InputContext.class);
+    doReturn(applicationId).when(inputContext).getApplicationId();
+    doReturn("sourceVertex").when(inputContext).getSourceVertexName();
+    when(inputContext.getCounters()).thenReturn(new TezCounters());
+    return inputContext;
+  }
+
+  private Event createDataMovementEvent(int srcIndex, int targetIndex,
+      ByteString emptyPartitionByteString, boolean allPartitionsEmpty) {
+    return createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString,
+        allPartitionsEmpty, false, false, 0);
+  }
+
+  private Event createDataMovementEvent(int srcIndex, int targetIndex,
+      ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean
+      finalMergeDisabled, boolean incrementalEvent) {
+    return createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString,
+        allPartitionsEmpty, finalMergeDisabled, incrementalEvent, 0);
+  }
+
+  private Event createDataMovementEvent(int srcIndex, int targetIndex,
+      ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean
+      finalMergeDisabled, boolean incrementalEvent, int spillId) {
+    return createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString,
+        allPartitionsEmpty, finalMergeDisabled, incrementalEvent, spillId, HOST, PORT);
+  }
+
+
+  private Event createDataMovementEvent(int srcIndex, int targetIndex,
+      ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean
+      finalMergeDisabled, boolean incrementalEvent, int spillId, String host, int port) {
+    ShuffleUserPayloads.DataMovementEventPayloadProto.Builder builder =
+        ShuffleUserPayloads.DataMovementEventPayloadProto
+            .newBuilder();
+    if (!allPartitionsEmpty) {
+      builder.setHost(host);
+      builder.setPort(port);
+      builder.setPathComponent(PATH_COMPONENT);
+    }
+    if (finalMergeDisabled) {
+      builder.setLastEvent(incrementalEvent ? false : true);
+      builder.setSpillId(spillId);
+    }
+    builder.setRunDuration(10);
+    if (emptyPartitionByteString != null) {
+      builder.setEmptyPartitions(emptyPartitionByteString);
+    }
+    return DataMovementEvent
+        .create(srcIndex, targetIndex, 0, builder.build().toByteString().asReadOnlyByteBuffer());
+  }
+
+  @Before
+  public void setup() throws Exception {
+   setupScheduler(2);
+  }
+
+  private void setupScheduler(int numInputs) throws Exception {
+    InputContext inputContext = createTezInputContext();
+    Configuration config = new Configuration();
+    TezCounter shuffledInputsCounter =
+        inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
+    TezCounter reduceShuffleBytes =
+        inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES);
+    TezCounter reduceDataSizeDecompressed = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
+    TezCounter failedShuffleCounter =
+        inputContext.getCounters().findCounter(TaskCounter.NUM_FAILED_SHUFFLE_INPUTS);
+    TezCounter bytesShuffedToDisk = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_TO_DISK);
+    TezCounter bytesShuffedToDiskDirect = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
+    TezCounter bytesShuffedToMem = inputContext.getCounters().findCounter(
+        TaskCounter.SHUFFLE_BYTES_TO_MEM);
+    realScheduler = new ShuffleScheduler(
+        inputContext,
+        config,
+        numInputs,
+        null,
+        shuffledInputsCounter,
+        reduceShuffleBytes,
+        reduceDataSizeDecompressed,
+        failedShuffleCounter,
+        bytesShuffedToDisk,
+        bytesShuffedToDiskDirect,
+        bytesShuffedToMem,
+        System.currentTimeMillis());
+    scheduler = spy(realScheduler);
+    handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, false);
+    mergeManager = mock(MergeManager.class);
+  }
+
+  @Test
+  public void testFinalMergeDisabledEvents() throws IOException, InterruptedException {
+    //test with 2 events per input (2 inputs)
+    int attemptNum = 0;
+    int inputIdx = 0;
+    Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0);
+    InputAttemptIdentifier id1 =
+        new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum,
+            PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
+    handler.handleEvents(Collections.singletonList(dme1));
+    String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
+    int partitionId = attemptNum;
+    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id1));
+    verify(scheduler).shuffleInfoEventsMap.containsKey(id1);
+
+    //Send final_update event.
+    Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1);
+    InputAttemptIdentifier id2 =
+        new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum,
+            PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
+    handler.handleEvents(Collections.singletonList(dme2));
+    baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
+    partitionId = attemptNum;
+    assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2));
+    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id2));
+    assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2));
+
+    MapHost host = scheduler.getHost();
+    assertTrue(host != null);
+    List<InputAttemptIdentifier> list = scheduler.getMapsForHost(host);
+    assertTrue(!list.isEmpty());
+    //Let the final_update event pass
+    MapOutput output = MapOutput.createMemoryMapOutput(id2, mergeManager, 1000, true);
+    scheduler.copySucceeded(id2, host, 1000, 10000, 10000, output);
+    assertTrue(!scheduler.isDone()); //we haven't downloaded id1 yet
+    output = MapOutput.createMemoryMapOutput(id1, mergeManager, 1000, true);
+    scheduler.copySucceeded(id1, host, 1000, 10000, 10000, output);
+    assertTrue(!scheduler.isDone()); //we haven't downloaded another source yet
+
+    //Send events for source 2
+    attemptNum = 0;
+    inputIdx = 1;
+    Event dme3 = createDataMovementEvent(attemptNum, inputIdx, null, false, true,
+        true, 1);
+    InputAttemptIdentifier id3 = new InputAttemptIdentifier(new InputIdentifier(inputIdx),
+        attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE,
+        0);
+    handler.handleEvents(Collections.singletonList(dme3));
+    //Send final_update event (empty partition directly invoking copySucceeded).
+    InputAttemptIdentifier id4 = new InputAttemptIdentifier(new InputIdentifier(inputIdx),
+        attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
+    assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier().getInputIndex()));
+    scheduler.copySucceeded(id4, null, 0, 0, 0, null);
+    assertTrue(!scheduler.isDone()); //we haven't downloaded another id yet
+    //Let the incremental event pass
+    output = MapOutput.createMemoryMapOutput(id3, mergeManager, 1000, true);
+    scheduler.copySucceeded(id3, host, 1000, 10000, 10000, output);
+    assertTrue(scheduler.isDone());
+  }
+
+  @Test(timeout = 5000)
+  public void basicTest() throws IOException {
+    List<Event> events = new LinkedList<Event>();
+    int srcIdx = 0;
+    int targetIdx = 1;
+    Event dme = createDataMovementEvent(srcIdx, targetIdx, null, false);
+    events.add(dme);
+    handler.handleEvents(events);
+    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0,
+        PATH_COMPONENT);
+    String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
+    int partitionId = srcIdx;
+    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId),
+        eq(baseUri), eq(expectedIdentifier));
+  }
+
+  @Test(timeout = 5000)
+  public void testFailedEvent() throws IOException {
+    List<Event> events = new LinkedList<Event>();
+    int targetIdx = 1;
+    InputFailedEvent failedEvent = InputFailedEvent.create(targetIdx, 0);
+    events.add(failedEvent);
+    handler.handleEvents(events);
+    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
+    verify(scheduler).obsoleteInput(eq(expectedIdentifier));
+  }
+
+  @Test(timeout = 5000)
+  public void testAllPartitionsEmpty() throws IOException {
+    List<Event> events = new LinkedList<Event>();
+    int srcIdx = 0;
+    int targetIdx = 1;
+    Event dme = createDataMovementEvent(srcIdx, targetIdx, createEmptyPartitionByteString(srcIdx)
+        , true);
+    events.add(dme);
+    handler.handleEvents(events);
+    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
+    verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l),
+        eq(0l), eq(0l), any(MapOutput.class));
+  }
+
+  @Test(timeout = 5000)
+  public void testCurrentPartitionEmpty() throws IOException {
+    List<Event> events = new LinkedList<Event>();
+    int srcIdx = 0;
+    int targetIdx = 1;
+    Event dme = createDataMovementEvent(srcIdx, targetIdx, createEmptyPartitionByteString(srcIdx)
+        , false);
+    events.add(dme);
+    handler.handleEvents(events);
+    InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
+    verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l),
+        eq(0l), eq(0l), any(MapOutput.class));
+  }
+
+  @Test(timeout = 5000)
+  public void testOtherPartitionEmpty() throws IOException {
+    List<Event> events = new LinkedList<Event>();
+    int srcIdx = 0;
+    int taskIndex = 1;
+    Event dme = createDataMovementEvent(srcIdx, taskIndex, createEmptyPartitionByteString(100),
+        false);
+    events.add(dme);
+    handler.handleEvents(events);
+    String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
+    int partitionId = srcIdx;
+    InputAttemptIdentifier expectedIdentifier =
+        new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT);
+    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri),
+        eq(expectedIdentifier));
+  }
+
+  private ByteString createEmptyPartitionByteString(int... emptyPartitions) throws IOException {
+    BitSet bitSet = new BitSet();
+    for (int i : emptyPartitions) {
+      bitSet.set(i);
+    }
+    return TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(bitSet));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/c85b724d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index f8331d6..d0187bd 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -6,15 +6,19 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.junit.After;
 import org.junit.Assert;
@@ -22,13 +26,19 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -73,7 +83,7 @@ public class TestPipelinedSorter {
   }
 
   @Before
-  public void setup() {
+  public void setup() throws IOException {
     ApplicationId appId = ApplicationId.newInstance(10000, 1);
     TezCounters counters = new TezCounters();
     String uniqueId = UUID.randomUUID().toString();
@@ -95,6 +105,7 @@ public class TestPipelinedSorter {
   public void cleanup() throws IOException {
     localFs.delete(workDir, true);
     sortedDataMap.clear();
+    localFs.mkdirs(workDir);
   }
 
   @Test
@@ -121,13 +132,29 @@ public class TestPipelinedSorter {
 
   }
 
+  @Test
+  public void testWithPipelinedShuffle() throws IOException {
+    this.numOutputs = 1;
+    this.initialAvailableMem = 5 *1024 * 1024;
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false);
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem, 1<<20);
+
+    //Write 100 keys each of size 10
+    writeData(sorter, 10000, 100);
+
+    //final merge is disabled. Final output file would not be populated in this case.
+    assertTrue(sorter.finalOutputFile == null);
+    verify(outputContext, times(1)).sendEvents(anyListOf(Event.class));
+  }
+
   public void basicTest(int partitions, int numKeys, int keySize,
       long initialAvailableMem, int blockSize) throws IOException {
     this.numOutputs = partitions; // single output
     PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
         initialAvailableMem, blockSize);
 
-    //Write 100 keys each of size 10
     writeData(sorter, numKeys, keySize);
 
     Path outputFile = sorter.finalOutputFile;
@@ -225,8 +252,18 @@ public class TestPipelinedSorter {
   }
 
   private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
-      String uniqueId) {
+      String uniqueId) throws IOException {
     OutputContext outputContext = mock(OutputContext.class);
+
+    ExecutionContext execContext = new ExecutionContextImpl("localhost");
+
+    DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
+    serviceProviderMetaData.writeInt(80);
+    doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext)
+        .getServiceProviderMetaData
+            (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+
+    doReturn(execContext).when(outputContext).getExecutionContext();
     doReturn(counters).when(outputContext).getCounters();
     doReturn(appId).when(outputContext).getApplicationId();
     doReturn(1).when(outputContext).getDAGAttemptNumber();

http://git-wip-us.apache.org/repos/asf/tez/blob/c85b724d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index 56c6e38..875f23a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -25,10 +25,14 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 
+import com.google.protobuf.ByteString;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,18 +41,27 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -80,11 +93,17 @@ public class TestDefaultSorter {
     conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
   }
 
-  @After
-  public void cleanup() throws IOException {
+  @AfterClass
+  public static void cleanup() throws IOException {
     localFs.delete(workingDir, true);
   }
 
+  @After
+  public void reset() throws IOException {
+    cleanup();
+    localFs.mkdirs(workingDir);
+  }
+
   @Test(timeout = 5000)
   public void testSortSpillPercent() throws Exception {
     OutputContext context = createTezOutputContext();
@@ -131,12 +150,116 @@ public class TestDefaultSorter {
     //Write 1000 keys each of size 1000, (> 1 spill should happen)
     try {
       writeData(sorter, 1000, 1000);
-      assertTrue(sorter.numSpills > 2);
+      assertTrue(sorter.getNumSpills() > 2);
     } catch(IOException ioe) {
       fail(ioe.getMessage());
     }
   }
 
+  @Test(timeout = 30000)
+  public void testWithEmptyData() throws IOException {
+    OutputContext context = createTezOutputContext();
+
+    conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
+    MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
+    context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
+        context.getTotalMemoryAvailableToTask()), handler);
+    DefaultSorter sorter = new DefaultSorter(context, conf, 1, handler.getMemoryAssigned());
+
+    //no data written. Empty
+    try {
+      sorter.flush();
+      sorter.close();
+      assertTrue(sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID));
+    } catch(Exception e) {
+      fail();
+    }
+  }
+
+  @Test(timeout = 30000)
+  public void testWithEmptyDataWithFinalMergeDisabled() throws IOException {
+    OutputContext context = createTezOutputContext();
+
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false);
+    conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
+    MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
+    context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
+        context.getTotalMemoryAvailableToTask()), handler);
+    DefaultSorter sorter = new DefaultSorter(context, conf, 1, handler.getMemoryAssigned());
+
+    //no data written. Empty
+    try {
+      sorter.flush();
+      sorter.close();
+      assertTrue(sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID +
+          "_0"));
+      assertTrue(context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL).getValue() > 0);
+    } catch(Exception e) {
+      fail();
+    }
+  }
+
+  @Test(timeout = 30000)
+  @SuppressWarnings("unchecked")
+  public void testWithSingleSpillWithFinalMergeDisabled() throws IOException {
+    OutputContext context = createTezOutputContext();
+
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false);
+    conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
+    MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
+    context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
+        context.getTotalMemoryAvailableToTask()), handler);
+    DefaultSorter sorter = new DefaultSorter(context, conf, 1, handler.getMemoryAssigned());
+
+    writeData(sorter, 1000, 10);
+    assertTrue(sorter.getNumSpills() == 1);
+    ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class);
+    verify(context, times(1)).sendEvents(eventCaptor.capture());
+    List<Event> events = eventCaptor.getValue();
+    for(Event event : events) {
+      if (event instanceof CompositeDataMovementEvent) {
+        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event;
+        ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads
+            .DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload()));
+        assertTrue(shufflePayload.getPathComponent().equalsIgnoreCase(UniqueID + "_0"));
+      }
+    }
+
+    assertTrue(context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL).getValue() > 0);
+  }
+
+  @Test(timeout = 30000)
+  @SuppressWarnings("unchecked")
+  public void testWithMultipleSpillsWithFinalMergeDisabled() throws IOException {
+    OutputContext context = createTezOutputContext();
+
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false);
+    conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1);
+    MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
+    context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
+        context.getTotalMemoryAvailableToTask()), handler);
+    DefaultSorter sorter = new DefaultSorter(context, conf, 1, handler.getMemoryAssigned());
+
+    writeData(sorter, 10000, 1000);
+    int spillCount = sorter.getNumSpills();
+    ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class);
+    verify(context, times(1)).sendEvents(eventCaptor.capture());
+    List<Event> events = eventCaptor.getValue();
+    int spillIndex = 0;
+    for(Event event : events) {
+      if (event instanceof CompositeDataMovementEvent) {
+        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event;
+        ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads
+            .DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload()));
+        assertTrue(shufflePayload.getPathComponent().equalsIgnoreCase(UniqueID + "_" + spillIndex));
+        spillIndex++;
+      }
+    }
+    assertTrue(spillIndex == spillCount);
+    assertTrue(context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL).getValue() > 0);
+  }
+
   private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException {
     for (int i = 0; i < numKeys; i++) {
       Text key = new Text(RandomStringUtils.randomAlphanumeric(keyLen));
@@ -156,6 +279,8 @@ public class TestDefaultSorter {
     TezCounters counters = new TezCounters();
 
     OutputContext context = mock(OutputContext.class);
+    ExecutionContext execContext = new ExecutionContextImpl("localhost");
+    doReturn(execContext).when(context).getExecutionContext();
     doReturn(counters).when(context).getCounters();
     doReturn(workingDirs).when(context).getWorkDirs();
     doReturn(payLoad).when(context).getUserPayload();

http://git-wip-us.apache.org/repos/asf/tez/blob/c85b724d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 4da62cb..e0b75b8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -58,6 +58,7 @@ import java.util.List;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -161,6 +162,72 @@ public class TestOnFileSortedOutput {
   }
 
   @Test (timeout = 5000)
+  public void testPipelinedShuffle() throws Exception {
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
+
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
+    OutputContext context = createTezOutputContext();
+    UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+    doReturn(payLoad).when(context).getUserPayload();
+    sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
+
+    sortedOutput.initialize();
+    sortedOutput.start();
+
+    assertFalse(sortedOutput.finalMergeEnabled);
+    assertTrue(sortedOutput.pipelinedShuffle);
+
+  }
+
+  @Test (timeout = 5000)
+  public void testPipelinedShuffleWithFinalMerge() throws Exception {
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
+
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, true);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
+    OutputContext context = createTezOutputContext();
+    UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+    doReturn(payLoad).when(context).getUserPayload();
+    sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
+
+    sortedOutput.initialize();
+    try {
+      sortedOutput.start();
+      fail("Should have thrown illegal arguement exception as final merge & pipelining are "
+          + "enabled together");
+    } catch(IllegalArgumentException ie) {
+      assertTrue(ie.getMessage().contains("has to be set to false for pipelined"));
+    }
+  }
+
+  @Test
+  public void testPipelinedSettingsWithDefaultSorter() throws Exception {
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
+    //negative. with sort threads-1
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 1);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER, false);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
+
+    OutputContext context = createTezOutputContext();
+    UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+    doReturn(payLoad).when(context).getUserPayload();
+    sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
+
+    sortedOutput.initialize();
+    try {
+      sortedOutput.start();
+      fail("Should have thrown illegal argument exception as pipelining is enabled with "
+          + "DefaultSorter");
+    } catch(IllegalArgumentException ie) {
+      assertTrue(ie.getMessage().contains("works with PipelinedSorter"));
+    }
+
+  }
+
+  @Test (timeout = 5000)
   public void testSortBufferSize() throws Exception{
     OutputContext context = createTezOutputContext();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2048);


Mime
View raw message