tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-2328. Add tez.runtime.sorter.class & rename tez.runtime.sort.threads to tez.runtime.pipelined.sorter.sort.threads. (hitesh)
Date Mon, 20 Apr 2015 18:29:19 GMT
Repository: tez
Updated Branches:
  refs/heads/master 57c62f1f1 -> db0a50c5e


TEZ-2328. Add tez.runtime.sorter.class & rename tez.runtime.sort.threads to tez.runtime.pipelined.sorter.sort.threads.
(hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/db0a50c5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/db0a50c5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/db0a50c5

Branch: refs/heads/master
Commit: db0a50c5eaed0ab19a762658bc661d792d8b31a7
Parents: 57c62f1
Author: Hitesh Shah <hitesh@apache.org>
Authored: Mon Apr 20 11:28:57 2015 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Mon Apr 20 11:28:57 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../library/api/TezRuntimeConfiguration.java    | 29 +++++++--
 .../common/sort/impl/PipelinedSorter.java       |  4 +-
 .../conf/OrderedPartitionedKVOutputConfig.java  | 42 ++++++++++++-
 .../output/OrderedPartitionedKVOutput.java      | 34 +++++++---
 .../api/TestTezRuntimeConfiguration.java        |  2 +-
 .../common/shuffle/TestShuffleUtils.java        |  1 -
 .../common/sort/impl/TestPipelinedSorter.java   |  3 +-
 .../sort/impl/dflt/TestDefaultSorter.java       |  3 +-
 .../TestOrderedPartitionedKVOutputConfig.java   |  1 +
 .../library/output/TestOnFileSortedOutput.java  | 65 ++++++++++++++------
 11 files changed, 143 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c86434..c44824c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2328. Add tez.runtime.sorter.class & rename tez.runtime.sort.threads
+  to tez.runtime.pipelined.sorter.sort.threads.
   TEZ-2333. Enable local fetch optimization by default.
   TEZ-2310. Deadlock caused by StateChangeNotifier sending notifications on
   thread holding locks

http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index b1341f6..a818de8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -17,8 +17,6 @@
  */
 package org.apache.tez.runtime.library.api;
 
-
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -32,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
 
 /**
  * Meant for user configurable job properties.
@@ -62,6 +61,11 @@ public class TezRuntimeConfiguration {
   private static List<String> unmodifiableAllowedPrefixes;
 
 
+  static {
+    Configuration.addDeprecation("tez.runtime.sort.threads",
+        TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
+  }
+
   /**
    * Configuration key to enable/disable IFile readahead.
    */
@@ -113,9 +117,21 @@ public class TezRuntimeConfiguration {
   public static final int TEZ_RUNTIME_COMBINE_MIN_SPILLS_DEFAULT = 3;
 
 
-  public static final String TEZ_RUNTIME_SORT_THREADS = TEZ_RUNTIME_PREFIX +
-      "sort.threads";
-  public static final int TEZ_RUNTIME_SORT_THREADS_DEFAULT = 2;
+  /**
+   * String value.
+   * Which sorter implementation to use.
+   * Valid values:
+   *    - LEGACY
+   *    - PIPELINED ( default )
+   *    {@link org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl}
+   */
+  public static final String TEZ_RUNTIME_SORTER_CLASS = TEZ_RUNTIME_PREFIX +
+      "sorter.class";
+  public static final String TEZ_RUNTIME_SORTER_CLASS_DEFAULT = SorterImpl.PIPELINED.name();
+
+  public static final String TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS = TEZ_RUNTIME_PREFIX
+
+      "pipelined.sorter.sort.threads";
+  public static final int TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT = 2;
 
   /**
    * Size of the buffer to use if not writing directly to disk.
@@ -321,7 +337,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_IO_SORT_MB);
     tezRuntimeKeys.add(TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
     tezRuntimeKeys.add(TEZ_RUNTIME_COMBINE_MIN_SPILLS);
-    tezRuntimeKeys.add(TEZ_RUNTIME_SORT_THREADS);
+    tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
     tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB);
     tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
     tezRuntimeKeys.add(TEZ_RUNTIME_PARTITIONER_CLASS);
@@ -357,6 +373,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
     tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
     tezRuntimeKeys.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
+    tezRuntimeKeys.add(TEZ_RUNTIME_SORTER_CLASS);
 
     defaultConf.addResource("core-default.xml");
     defaultConf.addResource("core-site.xml");

http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 0b959cd..65606bf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -177,8 +177,8 @@ public class PipelinedSorter extends ExternalSorter {
     merger = new SpanMerger(); // SpanIterators are comparable
     final int sortThreads = 
             this.conf.getInt(
-                TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 
-                TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS_DEFAULT);
+                TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS,
+                TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT);
     sortmaster = Executors.newFixedThreadPool(sortThreads,
         new ThreadFactoryBuilder().setDaemon(true)
         .setNameFormat("Sorter [" + TezUtilsInternal

http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
index 5437620..3281617 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
@@ -51,6 +51,19 @@ import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
 public class OrderedPartitionedKVOutputConfig {
 
   /**
+   * Currently supported sorter implementations
+   */
+  public enum SorterImpl {
+    /** Legacy sorter implementation based on Hadoop MapReduce shuffle impl.
+     * Restricted to 2 GB memory limits.
+     */
+    LEGACY,
+    /** Pipeline sorter - a more efficient sorter that supports > 2 GB sort buffers */
+    PIPELINED
+  }
+
+
+  /**
    * Configure parameters which are specific to the Output.
    */
   @InterfaceAudience.Private
@@ -85,8 +98,6 @@ public class OrderedPartitionedKVOutputConfig {
      */
     public T setCombiner(String combinerClassName, @Nullable Map<String, String> combinerConf);
 
-
-
     /**
      * Configure the number of threads to be used by the sorter
      *
@@ -94,6 +105,15 @@ public class OrderedPartitionedKVOutputConfig {
      * @return instance of the current builder
      */
     public T setSorterNumThreads(int numThreads);
+
+    /**
+     * Configure which sorter implementation to be used
+     *
+     * @param sorterImpl Use an in-built sorter implementations.
+     * @return instance of the current builder
+     */
+    public T setSorter(SorterImpl sorterImpl);
+
   }
 
   @SuppressWarnings("rawtypes")
@@ -133,6 +153,13 @@ public class OrderedPartitionedKVOutputConfig {
     }
 
     @Override
+    public SpecificBuilder setSorter(SorterImpl sorterImpl) {
+      builder.setSorter(sorterImpl);
+      return this;
+    }
+
+
+    @Override
     public SpecificBuilder<E> setAdditionalConfiguration(String key, String value)
{
       builder.setAdditionalConfiguration(key, value);
       return this;
@@ -294,10 +321,19 @@ public class OrderedPartitionedKVOutputConfig {
 
     @Override
     public Builder setSorterNumThreads(int numThreads) {
-      this.conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, numThreads);
+      this.conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS,
numThreads);
+      return this;
+    }
+
+    @Override
+    public Builder setSorter(SorterImpl sorterImpl) {
+      Preconditions.checkNotNull(sorterImpl, "Sorter cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS,
+          sorterImpl.name());
       return this;
     }
 
+
     @Override
     public Builder setAdditionalConfiguration(String key, String value) {
       Preconditions.checkNotNull(key, "Key cannot be null");

http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index b03c674..df6daf2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -18,15 +18,18 @@
 package org.apache.tez.runtime.library.output;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -102,8 +105,17 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput
{
   public synchronized void start() throws Exception {
     if (!isStarted.get()) {
       memoryUpdateCallbackHandler.validateUpdateReceived();
-      int sortThreads = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS,
-          TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS_DEFAULT);
+      String sorterClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS,
+          TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS_DEFAULT).toUpperCase(Locale.ENGLISH);
+      SorterImpl sorterImpl = null;
+      try {
+        sorterImpl = SorterImpl.valueOf(sorterClass);
+      } catch (IllegalArgumentException e) {
+        throw new IllegalArgumentException("Invalid sorter class specified in config"
+            + ", propertyName=" + TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS
+            + ", value=" + sorterClass
+            + ", validValues=" + Arrays.asList(SorterImpl.values()));
+      }
 
       finalMergeEnabled = conf.getBoolean(
           TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
@@ -121,17 +133,22 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput
{
           conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT,
false);
         }
 
-        //TODO: Enable it for pipelinedsorter only and not for DefaultSorter
-        Preconditions.checkArgument((sortThreads > 1), TezRuntimeConfiguration
-            .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " works with PipelinedSorter.");
+        Preconditions.checkArgument(sorterImpl.equals(SorterImpl.PIPELINED),
+            TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED
+              + "only works with PipelinedSorter.");
       }
 
-      if (sortThreads > 1) {
+      if (sorterImpl.equals(SorterImpl.PIPELINED)) {
         sorter = new PipelinedSorter(getContext(), conf, getNumPhysicalOutputs(),
             memoryUpdateCallbackHandler.getMemoryAssigned());
-      } else {
+      } else if (sorterImpl.equals(SorterImpl.LEGACY)) {
         sorter = new DefaultSorter(getContext(), conf, getNumPhysicalOutputs(),
             memoryUpdateCallbackHandler.getMemoryAssigned());
+      } else {
+        throw new UnsupportedOperationException("Unsupported sorter class specified in config"
+            + ", propertyName=" + TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS
+            + ", value=" + sorterClass
+            + ", validValues=" + Arrays.asList(SorterImpl.values()));
       }
 
       isStarted.set(true);
@@ -202,7 +219,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput
{
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS);
-    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS);
@@ -219,6 +236,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput
{
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/api/TestTezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/api/TestTezRuntimeConfiguration.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/api/TestTezRuntimeConfiguration.java
index fcaebd9..8e97b12 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/api/TestTezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/api/TestTezRuntimeConfiguration.java
@@ -27,9 +27,9 @@ import java.lang.reflect.Field;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.junit.Test;
 
+
 public class TestTezRuntimeConfiguration {
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index f68032f..2e264f6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -103,7 +103,6 @@ public class TestShuffleUtils {
   public void setup() throws Exception {
     outputContext = createTezOutputContext();
     conf = new Configuration();
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 1); // DefaultSorter
     conf.set("fs.defaultFS", "file:///");
     localFs = FileSystem.getLocal(conf);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
index 073d956..6e56567 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
@@ -19,6 +19,7 @@ import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.junit.After;
 import org.junit.Assert;
@@ -89,7 +90,7 @@ public class TestPipelinedSorter {
     this.outputContext = createMockOutputContext(counters, appId, uniqueId);
 
     //To enable PipelinedSorter, set 2 threads
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2);
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,

http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
index 408ec3b..70dce13 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java
@@ -54,6 +54,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.After;
@@ -76,7 +77,7 @@ public class TestDefaultSorter {
   @Before
   public void setup() throws IOException {
     conf = new Configuration();
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 1); // DefaultSorter
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.LEGACY.name());
// DefaultSorter
     conf.set("fs.defaultFS", "file:///");
     localFs = FileSystem.getLocal(conf);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVOutputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVOutputConfig.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVOutputConfig.java
index ac31cf5..891e807 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVOutputConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVOutputConfig.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+
 import org.junit.Test;
 
 public class TestOrderedPartitionedKVOutputConfig {

http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 8e43f21..30f78fe 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -39,10 +39,12 @@ import org.apache.tez.runtime.library.api.KeyValuesWriter;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -84,6 +86,7 @@ public class TestOnFileSortedOutput {
   //no of outputs
   private int partitions;
   //For sorter (pipelined / Default)
+  private SorterImpl sorterImpl;
   private int sorterThreads;
   
   final AtomicLong outputSize = new AtomicLong();
@@ -98,14 +101,16 @@ public class TestOnFileSortedOutput {
    * Constructor
    *
    * @param sendEmptyPartitionViaEvent
-   * @param threads number of threads needed for sorter (pipelinedsorter or default sorter)
+   * @param sorterImpl Which sorter impl ( pipeline/legacy )
+   * @param sorterThreads number of threads needed for sorter (required only for pipelined
sorter)
    * @param emptyPartitionIdx for which data should not be generated
    */
-  public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent, int threads,
-      int emptyPartitionIdx) throws IOException {
+  public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent, SorterImpl sorterImpl,
+      int sorterThreads, int emptyPartitionIdx) throws IOException {
     this.sendEmptyPartitionViaEvent = sendEmptyPartitionViaEvent;
     this.emptyPartitionIdx = emptyPartitionIdx;
-    this.sorterThreads = threads;
+    this.sorterImpl = sorterImpl;
+    this.sorterThreads = sorterThreads;
 
     conf = new Configuration();
 
@@ -117,7 +122,8 @@ public class TestOnFileSortedOutput {
 
   @Before
   public void setup() throws Exception {
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, sorterThreads);
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, sorterImpl.name());
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS, sorterThreads);
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
 
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
@@ -138,20 +144,20 @@ public class TestOnFileSortedOutput {
     fs.delete(workingDir, true);
   }
 
-  @Parameterized.Parameters(name = "test[{0}, {1}, {2}]")
+  @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3}]")
   public static Collection<Object[]> getParameters() {
     Collection<Object[]> parameters = new ArrayList<Object[]>();
     //empty_partition_via_events_enabled, noOfSortThreads, partitionToBeEmpty
-    parameters.add(new Object[] { false, 1, -1 });
-    parameters.add(new Object[] { false, 1, 0 });
-    parameters.add(new Object[] { true, 1, -1 });
-    parameters.add(new Object[] { true, 1, 0 });
+    parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, -1 });
+    parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, 0 });
+    parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, -1 });
+    parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0 });
 
     //Pipelined sorter
-    parameters.add(new Object[] { false, 2, -1 });
-    parameters.add(new Object[] { false, 2, 0 });
-    parameters.add(new Object[] { true, 2, -1 });
-    parameters.add(new Object[] { true, 2, 0 });
+    parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, -1 });
+    parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, 0 });
+    parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, -1 });
+    parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0 });
 
     return parameters;
   }
@@ -167,11 +173,10 @@ public class TestOnFileSortedOutput {
     writer = sortedOutput.getWriter();
   }
 
-  @Test (timeout = 5000)
-  public void testPipelinedShuffle() throws Exception {
+  private void _testPipelinedShuffle(String sorterName) throws Exception {
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
 
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2);
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, sorterName);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
     OutputContext context = createTezOutputContext();
@@ -184,14 +189,18 @@ public class TestOnFileSortedOutput {
 
     assertFalse(sortedOutput.finalMergeEnabled);
     assertTrue(sortedOutput.pipelinedShuffle);
+  }
 
+  @Test (timeout = 5000)
+  public void testPipelinedShuffle() throws Exception {
+    _testPipelinedShuffle(SorterImpl.PIPELINED.name());
   }
 
   @Test (timeout = 5000)
   public void testPipelinedShuffleWithFinalMerge() throws Exception {
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name());
 
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2);
     //wrong setting for final merge enable in output
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
@@ -209,8 +218,8 @@ public class TestOnFileSortedOutput {
   @Test
   public void testPipelinedSettingsWithDefaultSorter() throws Exception {
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
-    //negative. with sort threads-1
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 1);
+    //negative. with default sorter
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.LEGACY.name());
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
 
@@ -376,4 +385,20 @@ public class TestOnFileSortedOutput {
     return context;
   }
 
+  @Test(timeout=5000)
+  public void testInvalidSorter() throws Exception {
+    try {
+      _testPipelinedShuffle("Foo");
+      Assert.fail("Expected start to fail due to invalid sorter");
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+  }
+
+  @Test(timeout=5000)
+  public void testLowerCaseNamedSorter() throws Exception {
+    _testPipelinedShuffle("Pipelined");
+  }
+
+
 }


Mime
View raw message