tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-3698: UnorderedKV writer should be able to honor tez.runtime.enable.final-merge.in.output without pipelinedshuffle (rbalamohan)
Date Tue, 13 Jun 2017 03:06:56 GMT
Repository: tez
Updated Branches:
  refs/heads/master 29b45bc11 -> a70e16326


TEZ-3698: UnorderedKV writer should be able to honor tez.runtime.enable.final-merge.in.output
without pipelinedshuffle (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a70e1632
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a70e1632
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a70e1632

Branch: refs/heads/master
Commit: a70e163264c17ab7ee3184e79e8919c4e3bc744c
Parents: 29b45bc
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Tue Jun 13 08:36:43 2017 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Tue Jun 13 08:36:43 2017 +0530

----------------------------------------------------------------------
 .../writers/UnorderedPartitionedKVWriter.java   | 106 +++++++++++++------
 .../TestUnorderedPartitionedKVWriter.java       |  49 ++++++---
 .../output/TestOnFileUnorderedKVOutput.java     |   1 +
 3 files changed, 112 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a70e1632/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index bcc9cf9..6bdb9e4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -178,6 +178,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   private final Condition spillInProgress = spillLock.newCondition();
 
   private final boolean pipelinedShuffle;
+  private final boolean isFinalMergeEnabled;
+  // To store events when final merge is disabled
+  private final List<Event> finalEvents;
   // How partition stats should be reported.
   final ReportPartitionStats reportPartitionStats;
 
@@ -194,9 +197,14 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     this.destNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName());
     //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much
value in
     // this case.  Add it later if needed.
-    pipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
+    boolean pipelinedShuffleConf = this.conf.getBoolean(TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
+    this.isFinalMergeEnabled = conf.getBoolean(
+        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT);
+    this.pipelinedShuffle = pipelinedShuffleConf && !isFinalMergeEnabled;
+    this.finalEvents = Lists.newLinkedList();
 
     if (availableMemoryBytes == 0) {
       Preconditions.checkArgument(((numPartitions == 1) && !pipelinedShuffle), "availableMemory
"
@@ -385,7 +393,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       }
     }
 
-
     int valStart = currentBuffer.nextPosition;
     valSerializer.serialize(value);
 
@@ -725,15 +732,29 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         }
 
         //Regular code path.
-        if (numSpills.get() > 0) {
+        boolean updatedCounters = false;
+        if (numSpills.get() > 0 && isFinalMergeEnabled) {
           mergeAll();
         } else {
-          finalSpill();
+          if (finalSpill() && !isFinalMergeEnabled) {
+            //final spill generated some data. Add it to final events
+            updateTezCountersAndNotify();
+            updatedCounters = true;
+            finalEvents.add(generateVMEvent());
+            finalEvents.add(generateDMEvent());
+          }
+        }
+        if (!updatedCounters) {
+          updateTezCountersAndNotify();
         }
-        updateTezCountersAndNotify();
         cleanupCurrentBuffer();
-        eventList.add(generateVMEvent());
-        eventList.add(generateDMEvent());
+        if (isFinalMergeEnabled) {
+          eventList.add(generateVMEvent());
+          eventList.add(generateDMEvent());
+        } else {
+          //all events to be sent out are added in finalEvents.
+          eventList.addAll(finalEvents);
+        }
         return eventList;
       }
 
@@ -741,7 +762,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       if (finalSpill()) {
         // VertexManagerEvent is only sent at the end and thus sizePerPartition is used
         // for the sum of all spills.
-        sendPipelinedEventForSpill(currentBuffer.recordsPerPartition,
+        mayBeSendEventsForSpill(currentBuffer.recordsPerPartition,
             sizePerPartition, numSpills.get() - 1, true);
       }
       updateTezCountersAndNotify();
@@ -828,7 +849,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
   private boolean finalSpill() throws IOException {
     if (currentBuffer.nextPosition == 0) {
-      if (pipelinedShuffle) {
+      if (pipelinedShuffle || !isFinalMergeEnabled) {
         List<Event> eventList = Lists.newLinkedList();
         eventList.add(ShuffleUtils.generateVMEvent(outputContext,
             reportPartitionStats() ? new long[numPartitions] : null,
@@ -838,7 +859,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         emptyPartitions.flip(0, numPartitions);
         eventList.add(generateDMEvent(true, numSpills.get(), true,
             null, emptyPartitions));
-        outputContext.sendEvents(eventList);
+        if (pipelinedShuffle) {
+          outputContext.sendEvents(eventList);
+        } else if (!isFinalMergeEnabled) {
+          finalEvents.addAll(eventList);
+        }
       }
       return false;
     } else {
@@ -1011,7 +1036,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       final Path outPath = spillPathDetails.outputFilePath;
       out = rfs.create(outPath);
       BitSet emptyPartitions = null;
-      if (pipelinedShuffle) {
+      if (pipelinedShuffle || !isFinalMergeEnabled) {
         emptyPartitions = new BitSet(numPartitions);
       }
       for (int i = 0; i < numPartitions; i++) {
@@ -1049,7 +1074,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       }
       handleSpillIndex(spillPathDetails, spillRecord);
 
-      sendPipelinedEventForSpill(emptyPartitions, sizePerPartition,
+      mayBeSendEventsForSpill(emptyPartitions, sizePerPartition,
           spillIndex, false);
 
       LOG.info(destNameTrimmed + ": " + "Finished writing large record of size " + outSize
+ " to spill file " + spillIndex);
@@ -1158,36 +1183,53 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
   }
 
-  private void sendPipelinedEventForSpill(
-      BitSet emptyPartitions, long[] sizePerPartition, int spillNumber,
-      boolean isFinalUpdate) {
-    if (!pipelinedShuffle) {
-      return;
-    }
+  private List<Event> generateEventForSpill(BitSet emptyPartitions, long[] sizePerPartition,
+      int spillNumber,
+      boolean isFinalUpdate) throws IOException {
     List<Event> eventList = Lists.newLinkedList();
     //Send out an event for consuming.
+    String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber);
+    if (isFinalUpdate) {
+      eventList.add(ShuffleUtils.generateVMEvent(outputContext,
+          sizePerPartition, reportDetailedPartitionStats(), deflater.get()));
+    }
+    Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate,
+        pathComponent, emptyPartitions);
+    eventList.add(compEvent);
+    return eventList;
+  }
+
+  private void mayBeSendEventsForSpill(
+      BitSet emptyPartitions, long[] sizePerPartition,
+      int spillNumber, boolean isFinalUpdate) {
+    if (!pipelinedShuffle) {
+      if (isFinalMergeEnabled) {
+        return;
+      }
+    }
+    List<Event> events = null;
     try {
-      String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber);
-      if (isFinalUpdate) {
-        eventList.add(ShuffleUtils.generateVMEvent(outputContext,
-            sizePerPartition, reportDetailedPartitionStats(), deflater.get()));
+      events = generateEventForSpill(emptyPartitions, sizePerPartition, spillNumber,
+          isFinalUpdate);
+      LOG.info(destNameTrimmed + ": " + "Adding spill event for spill"
+          + " (final update=" + isFinalUpdate + "), spillId=" + spillNumber);
+      if (pipelinedShuffle) {
+        //Send out an event for consuming.
+        outputContext.sendEvents(events);
+      } else if (!isFinalMergeEnabled) {
+        this.finalEvents.addAll(events);
       }
-      Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate,
-          pathComponent, emptyPartitions);
-      eventList.add(compEvent);
-
-      LOG.info(destNameTrimmed + ": " + "Adding spill event for spill (final update=" + isFinalUpdate
+ "), spillId=" + spillNumber);
-      outputContext.sendEvents(eventList);
     } catch (IOException e) {
       LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", e);
-      outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Error in sending pipelined
events");
+      outputContext.reportFailure(TaskFailureType.NON_FATAL, e,
+          "Error in sending events.");
     }
   }
 
-  private void sendPipelinedEventForSpill(int[] recordsPerPartition,
+  private void mayBeSendEventsForSpill(int[] recordsPerPartition,
       long[] sizePerPartition, int spillNumber, boolean isFinalUpdate) {
     BitSet emptyPartitions = getEmptyPartitions(recordsPerPartition);
-    sendPipelinedEventForSpill(emptyPartitions, sizePerPartition, spillNumber,
+    mayBeSendEventsForSpill(emptyPartitions, sizePerPartition, spillNumber,
         isFinalUpdate);
   }
 
@@ -1221,7 +1263,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         }
       }
 
-      sendPipelinedEventForSpill(recordsPerPartition, sizePerPartition, spillNumber, false);
+      mayBeSendEventsForSpill(recordsPerPartition, sizePerPartition, spillNumber, false);
 
       try {
         for (WrappedBuffer buffer : result.filledBuffers) {

http://git-wip-us.apache.org/repos/asf/tez/blob/a70e1632/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 1a0bbf0..bbe0992 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -287,31 +287,57 @@ public class TestUnorderedPartitionedKVWriter {
 
   @Test(timeout = 10000)
   public void testRandomText() throws IOException, InterruptedException {
-    textTest(100, 10, 2048, 0, 0, 0, false);
+    textTest(100, 10, 2048, 0, 0, 0, false, true);
   }
 
   @Test(timeout = 10000)
   public void testLargeKeys() throws IOException, InterruptedException {
-    textTest(0, 10, 2048, 10, 0, 0, false);
+    textTest(0, 10, 2048, 10, 0, 0, false, true);
   }
 
   @Test(timeout = 10000)
   public void testLargevalues() throws IOException, InterruptedException {
-    textTest(0, 10, 2048, 0, 10, 0, false);
+    textTest(0, 10, 2048, 0, 10, 0, false, true);
   }
 
   @Test(timeout = 10000)
   public void testLargeKvPairs() throws IOException, InterruptedException {
-    textTest(0, 10, 2048, 0, 0, 10, false);
+    textTest(0, 10, 2048, 0, 0, 10, false, true);
   }
 
   @Test(timeout = 10000)
   public void testTextMixedRecords() throws IOException, InterruptedException {
-    textTest(100, 10, 2048, 10, 10, 10, false);
+    textTest(100, 10, 2048, 10, 10, 10, false, true);
+  }
+
+  @Test(timeout = 10000000)
+  public void testRandomTextWithoutFinalMerge() throws IOException, InterruptedException
{
+    textTest(100, 10, 2048, 0, 0, 0, false, false);
+  }
+
+  @Test(timeout = 10000)
+  public void testLargeKeysWithoutFinalMerge() throws IOException, InterruptedException {
+    textTest(0, 10, 2048, 10, 0, 0, false, false);
+  }
+
+  @Test(timeout = 10000)
+  public void testLargevaluesWithoutFinalMerge() throws IOException, InterruptedException
{
+    textTest(0, 10, 2048, 0, 10, 0, false, false);
+  }
+
+  @Test(timeout = 10000)
+  public void testLargeKvPairsWithoutFinalMerge() throws IOException, InterruptedException
{
+    textTest(0, 10, 2048, 0, 0, 10, false, false);
+  }
+
+  @Test(timeout = 10000)
+  public void testTextMixedRecordsWithoutFinalMerge() throws IOException, InterruptedException
{
+    textTest(100, 10, 2048, 10, 10, 10, false, false);
   }
 
   public void textTest(int numRegularRecords, int numPartitions, long availableMemory,
-      int numLargeKeys, int numLargevalues, int numLargeKvPairs, boolean pipeliningEnabled)
throws IOException,
+      int numLargeKeys, int numLargevalues, int numLargeKvPairs,
+      boolean pipeliningEnabled, boolean isFinalMergeEnabled) throws IOException,
       InterruptedException {
     Partitioner partitioner = new HashPartitioner();
     ApplicationId appId = ApplicationId.newInstance(10000000, 1);
@@ -325,10 +351,9 @@ public class TestUnorderedPartitionedKVWriter {
 
     Configuration conf = createConfiguration(outputContext, Text.class, Text.class, shouldCompress,
         -1, HashPartitioner.class);
-    if (pipeliningEnabled) {
-      conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
-      conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
-    }
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, pipeliningEnabled);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, isFinalMergeEnabled);
+
     CompressionCodec codec = null;
     if (shouldCompress) {
       codec = new DefaultCodec();
@@ -432,7 +457,7 @@ public class TestUnorderedPartitionedKVWriter {
     assertEquals(numLargeKeys + numLargevalues + numLargeKvPairs,
         outputLargeRecordsCounter.getValue());
 
-    if (pipeliningEnabled) {
+    if (pipeliningEnabled || !isFinalMergeEnabled) {
       return;
     }
 
@@ -590,7 +615,7 @@ public class TestUnorderedPartitionedKVWriter {
 
   @Test(timeout = 10000)
   public void testLargeKvPairs_WithPipelinedShuffle() throws IOException, InterruptedException
{
-    textTest(0, 10, 2048, 10, 20, 50, true);
+    textTest(0, 10, 2048, 10, 20, 50, true, false);
   }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a70e1632/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index a591ee9..393ac2e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -168,6 +168,7 @@ public class TestOnFileUnorderedKVOutput {
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
 
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 1);
 


Mime
View raw message