tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kshu...@apache.org
Subject tez git commit: TEZ-3605. Detect and prune empty partitions for the Ordered case (kshukla)
Date Wed, 28 Jun 2017 17:51:52 GMT
Repository: tez
Updated Branches:
  refs/heads/master ea05361f8 -> a47e8fcbe


TEZ-3605. Detect and prune empty partitions for the Ordered case (kshukla)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a47e8fcb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a47e8fcb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a47e8fcb

Branch: refs/heads/master
Commit: a47e8fcbea5eeab5a7cf812271d329524cc02dba
Parents: ea05361
Author: Kuhu Shukla <kshukla@yahoo-inc.com>
Authored: Wed Jun 28 12:47:37 2017 -0500
Committer: Kuhu Shukla <kshukla@yahoo-inc.com>
Committed: Wed Jun 28 12:47:37 2017 -0500

----------------------------------------------------------------------
 .../common/sort/impl/PipelinedSorter.java       | 107 ++++++++++-------
 .../common/sort/impl/dflt/DefaultSorter.java    | 113 ++++++++++--------
 .../common/shuffle/TestShuffleUtils.java        |   2 +-
 .../common/sort/impl/TestPipelinedSorter.java   |  70 ++++++++++-
 .../sort/impl/dflt/TestDefaultSorter.java       | 119 ++++++++++++++++++-
 5 files changed, 307 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a47e8fcb/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 3d4d29b..88d10d0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -490,8 +490,10 @@ public class PipelinedSorter extends ExternalSorter {
         Writer writer = null;
         try {
           long segmentStart = out.getPos();
-          writer = new Writer(conf, out, keyClass, valClass, codec,
-              spilledRecordsCounter, null, false);
+          if (!sendEmptyPartitionDetails || (i == partition)) {
+            writer = new Writer(conf, out, keyClass, valClass, codec,
+                spilledRecordsCounter, null, false);
+          }
           // we need not check for combiner since its a single record
           if (i == partition) {
             final long recordStart = out.getPos();
@@ -499,16 +501,18 @@ public class PipelinedSorter extends ExternalSorter {
             mapOutputRecordCounter.increment(1);
             mapOutputByteCounter.increment(out.getPos() - recordStart);
           }
-
-          writer.close();
-          adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
-
+          long rawLength = 0;
+          long partLength = 0;
+          if (writer != null) {
+            writer.close();
+            rawLength = writer.getRawLength();
+            partLength = writer.getCompressedLength();
+          }
+          adjustSpillCounters(rawLength, partLength);
           // record offsets
           final TezIndexRecord rec =
               new TezIndexRecord(
-                  segmentStart,
-                  writer.getRawLength(),
-                  writer.getCompressedLength());
+                  segmentStart, rawLength, partLength);
           spillRec.putIndex(rec, i);
           writer = null;
         } finally {
@@ -569,28 +573,37 @@ public class PipelinedSorter extends ExternalSorter {
         TezRawKeyValueIterator kvIter = merger.filter(i);
         //write merged output to disk
         long segmentStart = out.getPos();
-        Writer writer =
-          new Writer(conf, out, keyClass, valClass, codec,
+        Writer writer = null;
+        boolean hasNext = kvIter.next();
+        if (hasNext || !sendEmptyPartitionDetails) {
+          writer = new Writer(conf, out, keyClass, valClass, codec,
               spilledRecordsCounter, null, merger.needsRLE());
+        }
         if (combiner == null) {
-          while(kvIter.next()) {
+          while (hasNext) {
             writer.append(kvIter.getKey(), kvIter.getValue());
+            hasNext = kvIter.next();
           }
         } else {          
-          runCombineProcessor(kvIter, writer);
+          if (hasNext) {
+            runCombineProcessor(kvIter, writer);
+          }
         }
+        long rawLength = 0;
+        long partLength = 0;
         //close
-        writer.close();
-        adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
+        if (writer != null) {
+          writer.close();
+          rawLength = writer.getRawLength();
+          partLength = writer.getCompressedLength();
+        }
+        adjustSpillCounters(rawLength, partLength);
         // record offsets
-        final TezIndexRecord rec = 
-            new TezIndexRecord(
-                segmentStart,
-                writer.getRawLength(),
-                writer.getCompressedLength());
+        final TezIndexRecord rec =
+            new TezIndexRecord(segmentStart, rawLength, partLength);
         spillRec.putIndex(rec, i);
         if (!isFinalMergeEnabled() && reportPartitionStats()) {
-          partitionStats[i] += writer.getCompressedLength();
+          partitionStats[i] += partLength;
         }
       }
 
@@ -741,18 +754,21 @@ public class PipelinedSorter extends ExternalSorter {
       final TezSpillRecord spillRec = new TezSpillRecord(partitions);
 
       for (int parts = 0; parts < partitions; parts++) {
+        boolean shouldWrite = false;
         //create the segments to be merged
         List<Segment> segmentList =
             new ArrayList<Segment>(numSpills);
         for (int i = 0; i < numSpills; i++) {
           Path spillFilename = spillFilePaths.get(i);
           TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
-
-          DiskSegment s =
-              new DiskSegment(rfs, spillFilename, indexRecord.getStartOffset(),
-                  indexRecord.getPartLength(), codec, ifileReadAhead,
-                  ifileReadAheadLength, ifileBufferSize, true);
-          segmentList.add(i, s);
+          if (indexRecord.hasData() || !sendEmptyPartitionDetails) {
+            shouldWrite = true;
+            DiskSegment s =
+                new DiskSegment(rfs, spillFilename, indexRecord.getStartOffset(),
+                    indexRecord.getPartLength(), codec, ifileReadAhead,
+                    ifileReadAheadLength, ifileBufferSize, true);
+            segmentList.add(s);
+          }
         }
 
         int mergeFactor =
@@ -771,29 +787,32 @@ public class PipelinedSorter extends ExternalSorter {
             null, merger.needsRLE()); // Not using any Progress in TezMerger. Should just
work.
         //write merged output to disk
         long segmentStart = finalOut.getPos();
-        Writer writer =
-            new Writer(conf, finalOut, keyClass, valClass, codec,
-                spilledRecordsCounter, null, merger.needsRLE());
-        if (combiner == null || numSpills < minSpillsForCombine) {
-          TezMerger.writeFile(kvIter, writer, progressable,
-              TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
-        } else {
-          runCombineProcessor(kvIter, writer);
-        }
+        long rawLength = 0;
+        long partLength = 0;
+        if (shouldWrite) {
+          Writer writer =
+              new Writer(conf, finalOut, keyClass, valClass, codec,
+                  spilledRecordsCounter, null, merger.needsRLE());
+          if (combiner == null || numSpills < minSpillsForCombine) {
+            TezMerger.writeFile(kvIter, writer, progressable,
+                TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+          } else {
+            runCombineProcessor(kvIter, writer);
+          }
 
-        //close
-        writer.close();
-        outputBytesWithOverheadCounter.increment(writer.getRawLength());
+          //close
+          writer.close();
+          rawLength = writer.getRawLength();
+          partLength = writer.getCompressedLength();
+        }
+        outputBytesWithOverheadCounter.increment(rawLength);
 
         // record offsets
         final TezIndexRecord rec =
-            new TezIndexRecord(
-                segmentStart,
-                writer.getRawLength(),
-                writer.getCompressedLength());
+            new TezIndexRecord(segmentStart, rawLength, partLength);
         spillRec.putIndex(rec, parts);
         if (reportPartitionStats()) {
-          partitionStats[parts] += writer.getCompressedLength();
+          partitionStats[parts] += partLength;
         }
       }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a47e8fcb/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 1528076..268e237 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -901,8 +901,11 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
         IFile.Writer writer = null;
         try {
           long segmentStart = out.getPos();
-          writer = new Writer(conf, out, keyClass, valClass, codec,
-                                    spilledRecordsCounter, null, rle);
+          if (spindex < mend && kvmeta.get(offsetFor(spindex) + PARTITION) ==
i
+              || !sendEmptyPartitionDetails) {
+            writer = new Writer(conf, out, keyClass, valClass, codec,
+                spilledRecordsCounter, null, rle);
+          }
           if (combiner == null) {
             // spill directly
             DataInputBuffer key = new DataInputBuffer();
@@ -934,21 +937,22 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
               runCombineProcessor(kvIter, writer);
             }
           }
-
+          long rawLength = 0;
+          long partLength = 0;
           // close the writer
-          writer.close();
-          adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
+          if (writer != null) {
+            writer.close();
+            rawLength = writer.getRawLength();
+            partLength = writer.getCompressedLength();
+          }
+          adjustSpillCounters(rawLength, partLength);
           // record offsets
           final TezIndexRecord rec =
-              new TezIndexRecord(
-                  segmentStart,
-                  writer.getRawLength(),
-                  writer.getCompressedLength());
+              new TezIndexRecord(segmentStart, rawLength, partLength);
           spillRec.putIndex(rec, i);
-          if (!isFinalMergeEnabled() && reportPartitionStats()) {
-            partitionStats[i] += writer.getCompressedLength();
+          if (!isFinalMergeEnabled() && reportPartitionStats() && writer
!= null) {
+            partitionStats[i] += partLength;
           }
-
           writer = null;
         } finally {
           if (null != writer) writer.close();
@@ -1003,9 +1007,10 @@ public final class DefaultSorter extends ExternalSorter implements
IndexedSortab
         try {
           long segmentStart = out.getPos();
           // Create a new codec, don't care!
-          writer = new IFile.Writer(conf, out, keyClass, valClass, codec,
-                                          spilledRecordsCounter, null);
-
+          if (!sendEmptyPartitionDetails || (i == partition)) {
+            writer = new Writer(conf, out, keyClass, valClass, codec,
+                spilledRecordsCounter, null, false);
+          }
           if (i == partition) {
             final long recordStart = out.getPos();
             writer.append(key, value);
@@ -1013,16 +1018,17 @@ public final class DefaultSorter extends ExternalSorter implements
IndexedSortab
             // compression
             mapOutputByteCounter.increment(out.getPos() - recordStart);
           }
-          writer.close();
-
-          adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
+          long rawLength =0;
+          long partLength =0;
+          if (writer != null) {
+            writer.close();
+            rawLength = writer.getRawLength();
+            partLength = writer.getCompressedLength();
+          }
+          adjustSpillCounters(rawLength, partLength);
 
           // record offsets
-          TezIndexRecord rec =
-              new TezIndexRecord(
-                  segmentStart,
-                  writer.getRawLength(),
-                  writer.getCompressedLength());
+          TezIndexRecord rec = new TezIndexRecord(segmentStart, rawLength, partLength);
           spillRec.putIndex(rec, i);
 
           writer = null;
@@ -1265,22 +1271,23 @@ public final class DefaultSorter extends ExternalSorter implements
IndexedSortab
     if (numSpills == 0) {
       // TODO Change event generation to say there is no data rather than generating a dummy
file
       //create dummy files
-
+      long rawLength = 0;
+      long partLength = 0;
       TezSpillRecord sr = new TezSpillRecord(partitions);
       try {
         for (int i = 0; i < partitions; i++) {
           long segmentStart = finalOut.getPos();
-          Writer writer =
-            new Writer(conf, finalOut, keyClass, valClass, codec, null, null);
-          writer.close();
-
+          if (!sendEmptyPartitionDetails) {
+            Writer writer =
+                new Writer(conf, finalOut, keyClass, valClass, codec, null, null);
+            writer.close();
+            rawLength = writer.getRawLength();
+            partLength = writer.getCompressedLength();
+          }
           TezIndexRecord rec =
-              new TezIndexRecord(
-                  segmentStart,
-                  writer.getRawLength(),
-                  writer.getCompressedLength());
+              new TezIndexRecord(segmentStart, rawLength, partLength);
           // Covers the case of multiple spills.
-          outputBytesWithOverheadCounter.increment(writer.getRawLength());
+          outputBytesWithOverheadCounter.increment(rawLength);
           sr.putIndex(rec, i);
         }
         sr.writeToFile(finalIndexFile, conf);
@@ -1299,19 +1306,21 @@ public final class DefaultSorter extends ExternalSorter implements
IndexedSortab
     else {
       final TezSpillRecord spillRec = new TezSpillRecord(partitions);
       for (int parts = 0; parts < partitions; parts++) {
+        boolean shouldWrite = false;
         //create the segments to be merged
         List<Segment> segmentList =
-          new ArrayList<Segment>(numSpills);
-        for(int i = 0; i < numSpills; i++) {
+            new ArrayList<Segment>(numSpills);
+        for (int i = 0; i < numSpills; i++) {
           outputContext.notifyProgress();
           TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
-
-          DiskSegment s =
-            new DiskSegment(rfs, filename[i], indexRecord.getStartOffset(),
-                             indexRecord.getPartLength(), codec, ifileReadAhead,
-                             ifileReadAheadLength, ifileBufferSize, true);
-          segmentList.add(i, s);
-
+          if (indexRecord.hasData() || !sendEmptyPartitionDetails) {
+            shouldWrite = true;
+            DiskSegment s =
+              new DiskSegment(rfs, filename[i], indexRecord.getStartOffset(),
+                               indexRecord.getPartLength(), codec, ifileReadAhead,
+                               ifileReadAheadLength, ifileBufferSize, true);
+            segmentList.add(s);
+          }
           if (LOG.isDebugEnabled()) {
             LOG.debug(outputContext.getDestinationVertexName() + ": "
                 + "TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
@@ -1338,6 +1347,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
 
         //write merged output to disk
         long segmentStart = finalOut.getPos();
+        long rawLength = 0;
+        long partLength = 0;
+        if (shouldWrite) {
         Writer writer =
             new Writer(conf, finalOut, keyClass, valClass, codec,
                 spilledRecordsCounter, null);
@@ -1348,17 +1360,16 @@ public final class DefaultSorter extends ExternalSorter implements
IndexedSortab
           runCombineProcessor(kvIter, writer);
         }
         writer.close();
-        outputBytesWithOverheadCounter.increment(writer.getRawLength());
-
-        // record offsets
-        final TezIndexRecord rec =
-            new TezIndexRecord(
-                segmentStart,
-                writer.getRawLength(),
-                writer.getCompressedLength());
+        rawLength = writer.getRawLength();
+        partLength = writer.getCompressedLength();
+      }
+      outputBytesWithOverheadCounter.increment(rawLength);
+      // record offsets
+      final TezIndexRecord rec =
+          new TezIndexRecord(segmentStart, rawLength, partLength);
         spillRec.putIndex(rec, parts);
         if (reportPartitionStats()) {
-          partitionStats[parts] += writer.getCompressedLength();
+          partitionStats[parts] += partLength;
         }
       }
       numShuffleChunks.setValue(1); //final merge has happened

http://git-wip-us.apache.org/repos/asf/tez/blob/a47e8fcb/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index c461ea5..1d2d428 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -142,7 +142,7 @@ public class TestShuffleUtils {
     for(int i=0;i<numPartitions;i++) {
       long rawLen = ThreadLocalRandom.current().nextLong(100, 200);
       if (i % 2  == 0 || allEmptyPartitions) {
-        rawLen = 6; //indicates empty partition
+        rawLen = 0; //indicates empty partition, see TEZ-3605
       }
       TezIndexRecord indexRecord = new TezIndexRecord(startOffset, rawLen, partLen);
       startOffset += partLen;

http://git-wip-us.apache.org/repos/asf/tez/blob/a47e8fcb/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 15fae07..f85272b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -176,9 +176,63 @@ public class TestPipelinedSorter {
     // final merge is disabled. Final output file would not be populated in this case.
     assertTrue(sorter.finalOutputFile == null);
     TezCounter numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
-    assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue());
+//    assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue());
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
+
+  }
+
+  @Test
+  public void testEmptyPartitionsTwoSpillsNoEmptyEvents() throws Exception {
+    testEmptyPartitionsHelper(2, false);
+  }
+
+  @Test
+  public void testEmptyPartitionsTwoSpillsWithEmptyEvents() throws Exception {
+    testEmptyPartitionsHelper(2, true);
+  }
+
+  @Test
+  public void testEmptyPartitionsNoSpillsNoEmptyEvents() throws Exception {
+    testEmptyPartitionsHelper(0, false);
+  }
+
+  @Test
+  public void testEmptyPartitionsNoSpillsWithEmptyEvents() throws Exception {
+    testEmptyPartitionsHelper(0, true);
+  }
+
+  public void testEmptyPartitionsHelper(int numKeys, boolean sendEmptyPartitionDetails) throws
IOException, InterruptedException {
+    int partitions = 50;
+    this.numOutputs = partitions;
+    this.initialAvailableMem = 1 *1024 * 1024;
+    Configuration conf = getConf();
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
sendEmptyPartitionDetails);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1);
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, partitions,
+        initialAvailableMem);
 
+    writeData(sorter, numKeys, 1000000);
+    if (numKeys == 0) {
+      assertTrue(sorter.getNumSpills() == 1);
+    } else {
+      assertTrue(sorter.getNumSpills() == numKeys + 1);
+    }
+    verifyCounters(sorter, outputContext);
+    Path indexFile = sorter.getFinalIndexFile();
+    TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+    for (int i = 0; i < partitions; i++) {
+      TezIndexRecord tezIndexRecord = spillRecord.getIndex(i);
+      if (tezIndexRecord.hasData()) {
+        continue;
+      }
+      if (sendEmptyPartitionDetails) {
+        Assert.assertEquals("Unexpected raw length for " + i + "th partition", 0, tezIndexRecord.getRawLength());
+      } else {
+        Assert.assertEquals("Unexpected raw length for " + i + "th partition", 6, tezIndexRecord.getRawLength());
+      }
+    }
   }
 
   @Test
@@ -452,10 +506,14 @@ public class TestPipelinedSorter {
     verifyCounters(sorter, outputContext);
     Path outputFile = sorter.finalOutputFile;
     FileSystem fs = outputFile.getFileSystem(conf);
-    IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, false, -1, 4096);
+    TezCounter finalOutputBytes =
+        outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+    if (finalOutputBytes.getValue() > 0) {
+      IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, false, -1,
4096);
+      verifyData(reader);
+      reader.close();
+    }
     //Verify dataset
-    verifyData(reader);
-    reader.close();
     verify(outputContext, atLeastOnce()).notifyProgress();
   }
 
@@ -486,11 +544,11 @@ public class TestPipelinedSorter {
 
     TezCounter finalOutputBytes =
         context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
-    assertTrue(finalOutputBytes.getValue() > 0);
+    assertTrue(finalOutputBytes.getValue() >= 0);
 
     TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter
         (TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
-    assertTrue(outputBytesWithOverheadCounter.getValue() > 0);
+    assertTrue(outputBytesWithOverheadCounter.getValue() >= 0);
   }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a47e8fcb/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index b3b16d9..444ebaf 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.runtime.library.common.sort.impl.dflt;
 
+import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -31,6 +33,8 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -49,6 +53,8 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.Event;
@@ -62,6 +68,8 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
@@ -267,6 +275,56 @@ public class TestDefaultSorter {
   }
 
   @Test(timeout = 30000)
+  public void testEmptyCaseFileLengths() throws IOException {
+    testEmptyCaseFileLengthsHelper(50, 2, 1, 48);
+    testEmptyCaseFileLengthsHelper(1, 1, 10, 0);
+  }
+
+  public void testEmptyCaseFileLengthsHelper(int numPartitions, int numKeys, int keyLen,
int expectedEmptyPartitions)
+      throws IOException {
+    OutputContext context = createTezOutputContext();
+
+    MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
+    context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
+        context.getTotalMemoryAvailableToTask()), handler);
+    String auxService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+    DefaultSorter sorter = new DefaultSorter(context, conf, numPartitions, handler.getMemoryAssigned());
+    try {
+      writeData(sorter, numKeys, keyLen);
+      List<Event> events = new ArrayList<Event>();
+      String pathComponent = (context.getUniqueIdentifier() + "_" + 0);
+      ShuffleUtils.generateEventOnSpill(events, true, true, context, 0,
+          sorter.indexCacheList.get(0), 0, true, pathComponent, sorter.getPartitionStats(),
+          sorter.reportDetailedPartitionStats(), auxService, TezCommonUtils.newBestCompressionDeflater());
+
+      CompositeDataMovementEvent compositeDataMovementEvent =
+          (CompositeDataMovementEvent) events.get(1);
+      ByteBuffer bb = compositeDataMovementEvent.getUserPayload();
+      ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
+          ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
+
+      if (shufflePayload.hasEmptyPartitions()) {
+        byte[] emptyPartitionsBytesString =
+            TezCommonUtils.decompressByteStringToByteArray(
+                shufflePayload.getEmptyPartitions());
+        BitSet emptyPartitionBitSet = TezUtilsInternal.fromByteArray(emptyPartitionsBytesString);
+        Assert.assertTrue("Number of empty partitions did not match!",
+            emptyPartitionBitSet.cardinality() == expectedEmptyPartitions);
+      } else {
+        Assert.assertTrue(expectedEmptyPartitions == 0);
+      }
+      //4 bytes of header + numKeys* 2 *(keydata.length + keyLength.length) + 2 * 1 byte
of EOF_MARKER + 4 bytes of checksum
+      assertEquals("Unexpected Output File Size!",
+          localFs.getFileStatus(sorter.getFinalOutputFile()).getLen(), numKeys * (4 + (2
* (2 + keyLen)) + 2 + 4));
+      assertTrue(sorter.getNumSpills()  == 1);
+      verifyCounters(sorter, context);
+    } catch(IOException ioe) {
+      fail(ioe.getMessage());
+    }
+  }
+
+  @Test
   public void testWithEmptyData() throws IOException {
     OutputContext context = createTezOutputContext();
 
@@ -312,6 +370,63 @@ public class TestDefaultSorter {
     }
   }
 
+  @Test
+  public void testEmptyPartitions() throws Exception {
+    testEmptyPartitionsHelper(2, false);
+    testEmptyPartitionsHelper(2, true);
+    testEmptyPartitionsHelper(0, true);
+    testEmptyPartitionsHelper(0, true);
+  }
+
+  public void testEmptyPartitionsHelper(int numKeys, boolean sendEmptyPartitionDetails) throws
IOException {
+    OutputContext context = createTezOutputContext();
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
sendEmptyPartitionDetails);
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
+    conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
+    MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
+    context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
+        context.getTotalMemoryAvailableToTask()), handler);
+    int partitions = 50;
+    DefaultSorter sorter = new DefaultSorter(context, conf, partitions, handler.getMemoryAssigned());
+
+    writeData(sorter, numKeys, 1000000);
+    if (numKeys == 0) {
+      assertTrue(sorter.getNumSpills() == 1);
+    } else {
+      assertTrue(sorter.getNumSpills() == numKeys);
+    }
+    verifyCounters(sorter, context);
+    if (sorter.indexCacheList.size() != 0) {
+      for (int i = 0; i < sorter.getNumSpills(); i++) {
+        TezSpillRecord record = sorter.indexCacheList.get(i);
+        for (int j = 0; j < partitions; j++) {
+          TezIndexRecord tezIndexRecord = record.getIndex(j);
+          if (tezIndexRecord.hasData()) {
+            continue;
+          }
+          if (sendEmptyPartitionDetails) {
+            Assert.assertEquals("Unexpected raw length for " + i + "th partition", 0, tezIndexRecord.getRawLength());
+          } else {
+            Assert.assertEquals("", tezIndexRecord.getRawLength(), 6);
+          }
+        }
+      }
+    }
+    Path indexFile = sorter.getFinalIndexFile();
+    TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
+    for (int i = 0; i < partitions; i++) {
+      TezIndexRecord tezIndexRecord = spillRecord.getIndex(i);
+      if (tezIndexRecord.hasData()) {
+        continue;
+      }
+      if (sendEmptyPartitionDetails) {
+        Assert.assertEquals("Unexpected raw length for " + i + "th partition", 0, tezIndexRecord.getRawLength());
+      } else {
+        Assert.assertEquals("Unexpected raw length for " + i + "th partition", 6, tezIndexRecord.getRawLength());
+      }
+    }
+  }
+
   void testPartitionStats(boolean withStats) throws IOException {
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, withStats);
     OutputContext context = createTezOutputContext();
@@ -428,11 +543,11 @@ public class TestDefaultSorter {
     }
 
     TezCounter finalOutputBytes = context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
-    assertTrue(finalOutputBytes.getValue() > 0);
+    assertTrue(finalOutputBytes.getValue() >= 0);
 
     TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter
         (TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
-    assertTrue(outputBytesWithOverheadCounter.getValue() > 0);
+    assertTrue(outputBytesWithOverheadCounter.getValue() >= 0);
     verify(context, atLeastOnce()).notifyProgress();
   }
 


Mime
View raw message