tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-3777. Avoid buffer copies by passing RLE flag to TezMerger from PipelinedSorter (rbalamohan)
Date Wed, 28 Jun 2017 07:19:24 GMT
Repository: tez
Updated Branches:
  refs/heads/master 020a7c873 -> 24475acc7


TEZ-3777. Avoid buffer copies by passing RLE flag to TezMerger from PipelinedSorter (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/24475acc
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/24475acc
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/24475acc

Branch: refs/heads/master
Commit: 24475acc772f23248938a89ff50766d88c4a7da7
Parents: 020a7c8
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Wed Jun 28 12:49:04 2017 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Wed Jun 28 12:49:04 2017 +0530

----------------------------------------------------------------------
 .../common/sort/impl/PipelinedSorter.java       |   9 +-
 .../library/common/sort/impl/TezMerger.java     | 103 +++++++++++++------
 .../common/sort/impl/TestPipelinedSorter.java   |  52 ++++++++++
 3 files changed, 130 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/24475acc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 90bb4d7..3d4d29b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
@@ -767,7 +768,7 @@ public class PipelinedSorter extends ExternalSorter {
             (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf),
             progressable, sortSegments, true,
             null, spilledRecordsCounter, additionalSpillBytesRead,
-            null); // Not using any Progress in TezMerger. Should just work.
+            null, merger.needsRLE()); // Not using any Progress in TezMerger. Should just
work.
         //write merged output to disk
         long segmentStart = finalOut.getPos();
         Writer writer =
@@ -1269,6 +1270,12 @@ public class PipelinedSorter extends ExternalSorter {
     }
   }
 
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  public boolean needsRLE() {
+    return merger.needsRLE();
+  }
+
   private final class SpanMerger implements PartitionedRawKeyValueIterator {
     InputByteBuffer key = new InputByteBuffer();
     InputByteBuffer value = new InputByteBuffer();

http://git-wip-us.apache.org/repos/asf/tez/blob/24475acc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index c811455..6eb9a40 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -130,6 +130,29 @@ public class TezMerger {
 
   public static <K extends Object, V extends Object>
   TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+      Class keyClass, Class valueClass,
+      CompressionCodec codec,
+      List<Segment> segments,
+      int mergeFactor, Path tmpDir,
+      RawComparator comparator, Progressable reporter,
+      boolean sortSegments,
+      boolean considerFinalMergeForProgress,
+      TezCounter readsCounter,
+      TezCounter writesCounter,
+      TezCounter bytesReadCounter,
+      Progress mergePhase, boolean checkForSameKeys)
+      throws IOException, InterruptedException {
+    return new MergeQueue(conf, fs, segments, comparator, reporter,
+        sortSegments, codec, considerFinalMergeForProgress, checkForSameKeys).
+        merge(keyClass, valueClass,
+            mergeFactor, tmpDir,
+            readsCounter, writesCounter,
+            bytesReadCounter,
+            mergePhase);
+  }
+
+  public static <K extends Object, V extends Object>
+  TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
                             Class keyClass, Class valueClass,
                             CompressionCodec codec,
                             List<Segment> segments,
@@ -424,17 +447,18 @@ public class TezMerger {
   @VisibleForTesting
   static class MergeQueue<K extends Object, V extends Object>
   extends PriorityQueue<Segment> implements TezRawKeyValueIterator {
-    Configuration conf;
-    FileSystem fs;
-    CompressionCodec codec;
-    boolean ifileReadAhead = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT;
-    int ifileReadAheadLength = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
-    int ifileBufferSize = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT;
-    long recordsBeforeProgress = TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT;
+    final Configuration conf;
+    final FileSystem fs;
+    final CompressionCodec codec;
+    final boolean checkForSameKeys;
+    static final boolean ifileReadAhead = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT;
+    static final int ifileReadAheadLength = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
+    static final int ifileBufferSize = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT;
+    static final long recordsBeforeProgress = TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT;
     
     List<Segment> segments = new ArrayList<Segment>();
     
-    RawComparator comparator;
+    final RawComparator comparator;
 
     private long totalBytesProcessed;
     private float progPerByte;
@@ -444,7 +468,7 @@ public class TezMerger {
     // used in calculating mergeProgress.
     private final boolean considerFinalMergeForProgress;
 
-    Progressable reporter;
+    final Progressable reporter;
     
     final DataInputBuffer key = new DataInputBuffer();
     final DataInputBuffer value = new DataInputBuffer();
@@ -475,6 +499,7 @@ public class TezMerger {
                       TezCounter mergedMapOutputsCounter) 
     throws IOException {
       this.conf = conf;
+      this.checkForSameKeys = true;
       // this.recordsBeforeProgress =
       // conf.getLong(TezJobConfig.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS,
       // TezJobConfig.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
@@ -500,9 +525,25 @@ public class TezMerger {
       Collections.sort(segments, segmentComparator); 
     }
     
-    public MergeQueue(Configuration conf, FileSystem fs, 
+    public MergeQueue(Configuration conf, FileSystem fs,
         List<Segment> segments, RawComparator comparator,
         Progressable reporter, boolean sortSegments, boolean considerFinalMergeForProgress)
{
+      this(conf, fs, segments, comparator, reporter, sortSegments, null,
+          considerFinalMergeForProgress);
+    }
+
+    public MergeQueue(Configuration conf, FileSystem fs,
+        List<Segment> segments, RawComparator comparator,
+        Progressable reporter, boolean sortSegments, CompressionCodec codec,
+        boolean considerFinalMergeForProgress) {
+      this(conf, fs, segments, comparator, reporter, sortSegments, null,
+          considerFinalMergeForProgress, true);
+    }
+
+    public MergeQueue(Configuration conf, FileSystem fs,
+        List<Segment> segments, RawComparator comparator,
+        Progressable reporter, boolean sortSegments, CompressionCodec codec,
+        boolean considerFinalMergeForProgress, boolean checkForSameKeys) {
       this.conf = conf;
       this.fs = fs;
       this.comparator = comparator;
@@ -512,13 +553,7 @@ public class TezMerger {
       if (sortSegments) {
         Collections.sort(segments, segmentComparator);
       }
-    }
-
-    public MergeQueue(Configuration conf, FileSystem fs,
-        List<Segment> segments, RawComparator comparator,
-        Progressable reporter, boolean sortSegments, CompressionCodec codec,
-        boolean considerFinalMergeForProgress) {
-      this(conf, fs, segments, comparator, reporter, sortSegments, considerFinalMergeForProgress);
+      this.checkForSameKeys = checkForSameKeys;
       this.codec = codec;
     }
 
@@ -544,24 +579,26 @@ public class TezMerger {
 
     private void adjustPriorityQueue(Segment reader) throws IOException{
       long startPos = reader.getPosition();
-      if (hasNext == null) {
-        /**
-         * hasNext can be null during first iteration & prevKey is initialized here.
-         * In cases of NO_KEY/NEW_KEY, we readjust the queue later. If new segment/file is
found
-         * during this process, we need to compare keys for RLE across segment boundaries.
-         * prevKey can't be empty at that time (e.g custom comparators)
-         */
-        populatePreviousKey();
-      } else {
-        //indicates a key has been read already
-        if (hasNext != KeyState.SAME_KEY) {
+      if (checkForSameKeys) {
+        if (hasNext == null) {
           /**
-           * Store previous key before reading next for later key comparisons.
-           * If all keys in a segment are unique, it would always hit this code path and
key copies
-           * are wasteful in such condition, as these comparisons are mainly done for RLE.
-           * TODO: When better stats are available, this condition can be avoided.
+           * hasNext can be null during first iteration & prevKey is initialized here.
+           * In cases of NO_KEY/NEW_KEY, we readjust the queue later. If new segment/file
is found
+           * during this process, we need to compare keys for RLE across segment boundaries.
+           * prevKey can't be empty at that time (e.g custom comparators)
            */
           populatePreviousKey();
+        } else {
+          //indicates a key has been read already
+          if (hasNext != KeyState.SAME_KEY) {
+            /**
+             * Store previous key before reading next for later key comparisons.
+             * If all keys in a segment are unique, it would always hit this code path and
key copies
+             * are wasteful in such condition, as these comparisons are mainly done for RLE.
+             * TODO: When better stats are available, this condition can be avoided.
+             */
+            populatePreviousKey();
+          }
         }
       }
       hasNext = reader.readRawKey(nextKey);
@@ -589,7 +626,7 @@ public class TezMerger {
      */
     void compareKeyWithNextTopKey(Segment current) throws IOException {
       Segment nextTop = top();
-      if (nextTop != current) {
+      if (checkForSameKeys && nextTop != current) {
         //we have a different file. Compare it with previous key
         KeyValueBuffer nextKey = nextTop.getKey();
         int compare = compare(nextKey, prevKey);

http://git-wip-us.apache.org/repos/asf/tez/blob/24475acc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index e985292..15fae07 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -58,6 +58,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Mockito.atLeastOnce;
@@ -367,6 +368,38 @@ public class TestPipelinedSorter {
     verifyCounters(sorter, outputContext);
   }
 
+  @Test
+  public void testMultipleSpills() throws IOException {
+    Configuration conf = getConf();
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
+    this.numOutputs = 5;
+    this.initialAvailableMem = 5 * 1024 * 1024;
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3);
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem);
+
+    writeData(sorter, 25000, 1000);
+    assertFalse("Expecting needsRLE to be false", sorter.needsRLE());
+    verifyCounters(sorter, outputContext);
+  }
+
+  @Test
+  public void testMultipleSpills_WithRLE() throws IOException {
+    Configuration conf = getConf();
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
+    this.numOutputs = 5;
+    this.initialAvailableMem = 5 * 1024 * 1024;
+    conf.setInt(TezRuntimeConfiguration
+        .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3);
+    PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs,
+        initialAvailableMem);
+
+    writeSimilarKeys(sorter, 25000, 1000, true);
+    assertTrue("Expecting needsRLE to be true", sorter.needsRLE());
+    verifyCounters(sorter, outputContext);
+  }
+
   public void basicTest2(int partitions, int[] numkeys, int[] keysize,
       long initialAvailableMem, int  blockSize) throws IOException {
     this.numOutputs = partitions; // single output
@@ -704,6 +737,25 @@ public class TestPipelinedSorter {
     writeData(sorter, numKeys, keyLen, true);
   }
 
+  // duplicate some of the keys
+  private void writeSimilarKeys(ExternalSorter sorter, int numKeys, int keyLen,
+      boolean autoClose) throws IOException {
+    sortedDataMap.clear();
+    String keyStr = RandomStringUtils.randomAlphanumeric(keyLen);
+    for (int i = 0; i < numKeys; i++) {
+      if (i % 4 == 0) {
+        keyStr = RandomStringUtils.randomAlphanumeric(keyLen);
+      }
+      Text key = new Text(keyStr);
+      Text value = new Text(RandomStringUtils.randomAlphanumeric(keyLen));
+      sorter.write(key, value);
+      sortedDataMap.put(key.toString(), value.toString()); //for verifying data later
+    }
+    if (autoClose) {
+      closeSorter(sorter);
+    }
+  }
+
   private void writeData(ExternalSorter sorter, int numKeys, int keyLen,
       boolean autoClose) throws IOException {
     sortedDataMap.clear();


Mime
View raw message