Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5BD6E17AD8 for ; Fri, 24 Apr 2015 00:25:56 +0000 (UTC) Received: (qmail 36289 invoked by uid 500); 24 Apr 2015 00:25:56 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 36122 invoked by uid 500); 24 Apr 2015 00:25:56 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 35086 invoked by uid 99); 24 Apr 2015 00:25:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Apr 2015 00:25:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 81CF6E1806; Fri, 24 Apr 2015 00:25:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Fri, 24 Apr 2015 00:26:12 -0000 Message-Id: <52d66d0cc18c4fbcb1f1901b9df6ec41@git.apache.org> In-Reply-To: <3ae8c698c8aa44f5b905b43cdcaac9f0@git.apache.org> References: <3ae8c698c8aa44f5b905b43cdcaac9f0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/50] [abbrv] tez git commit: TEZ-2328. Add tez.runtime.sorter.class & rename tez.runtime.sort.threads to tez.runtime.pipelined.sorter.sort.threads. (hitesh) TEZ-2328. Add tez.runtime.sorter.class & rename tez.runtime.sort.threads to tez.runtime.pipelined.sorter.sort.threads. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/db0a50c5 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/db0a50c5 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/db0a50c5 Branch: refs/heads/TEZ-2003 Commit: db0a50c5eaed0ab19a762658bc661d792d8b31a7 Parents: 57c62f1 Author: Hitesh Shah Authored: Mon Apr 20 11:28:57 2015 -0700 Committer: Hitesh Shah Committed: Mon Apr 20 11:28:57 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../library/api/TezRuntimeConfiguration.java | 29 +++++++-- .../common/sort/impl/PipelinedSorter.java | 4 +- .../conf/OrderedPartitionedKVOutputConfig.java | 42 ++++++++++++- .../output/OrderedPartitionedKVOutput.java | 34 +++++++--- .../api/TestTezRuntimeConfiguration.java | 2 +- .../common/shuffle/TestShuffleUtils.java | 1 - .../common/sort/impl/TestPipelinedSorter.java | 3 +- .../sort/impl/dflt/TestDefaultSorter.java | 3 +- .../TestOrderedPartitionedKVOutputConfig.java | 1 + .../library/output/TestOnFileSortedOutput.java | 65 ++++++++++++++------ 11 files changed, 143 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9c86434..c44824c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2328. Add tez.runtime.sorter.class & rename tez.runtime.sort.threads + to tez.runtime.pipelined.sorter.sort.threads. TEZ-2333. Enable local fetch optimization by default. TEZ-2310. Deadlock caused by StateChangeNotifier sending notifications on thread holding locks http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index b1341f6..a818de8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -17,8 +17,6 @@ */ package org.apache.tez.runtime.library.api; - - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -32,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; /** * Meant for user configurable job properties. @@ -62,6 +61,11 @@ public class TezRuntimeConfiguration { private static List unmodifiableAllowedPrefixes; + static { + Configuration.addDeprecation("tez.runtime.sort.threads", + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS); + } + /** * Configuration key to enable/disable IFile readahead. */ @@ -113,9 +117,21 @@ public class TezRuntimeConfiguration { public static final int TEZ_RUNTIME_COMBINE_MIN_SPILLS_DEFAULT = 3; - public static final String TEZ_RUNTIME_SORT_THREADS = TEZ_RUNTIME_PREFIX + - "sort.threads"; - public static final int TEZ_RUNTIME_SORT_THREADS_DEFAULT = 2; + /** + * String value. + * Which sorter implementation to use. + * Valid values: + * - LEGACY + * - PIPELINED ( default ) + * {@link org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl} + */ + public static final String TEZ_RUNTIME_SORTER_CLASS = TEZ_RUNTIME_PREFIX + + "sorter.class"; + public static final String TEZ_RUNTIME_SORTER_CLASS_DEFAULT = SorterImpl.PIPELINED.name(); + + public static final String TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS = TEZ_RUNTIME_PREFIX + + "pipelined.sorter.sort.threads"; + public static final int TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT = 2; /** * Size of the buffer to use if not writing directly to disk. @@ -321,7 +337,7 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_IO_SORT_MB); tezRuntimeKeys.add(TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES); tezRuntimeKeys.add(TEZ_RUNTIME_COMBINE_MIN_SPILLS); - tezRuntimeKeys.add(TEZ_RUNTIME_SORT_THREADS); + tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS); tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB); tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES); tezRuntimeKeys.add(TEZ_RUNTIME_PARTITIONER_CLASS); @@ -357,6 +373,7 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH); tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH); tezRuntimeKeys.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT); + tezRuntimeKeys.add(TEZ_RUNTIME_SORTER_CLASS); defaultConf.addResource("core-default.xml"); defaultConf.addResource("core-site.xml"); http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 0b959cd..65606bf 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -177,8 +177,8 @@ public class PipelinedSorter extends ExternalSorter { merger = new SpanMerger(); // SpanIterators are comparable final int sortThreads = this.conf.getInt( - TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, - TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS_DEFAULT); + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS, + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT); sortmaster = Executors.newFixedThreadPool(sortThreads, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Sorter [" + TezUtilsInternal http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java index 5437620..3281617 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java @@ -51,6 +51,19 @@ import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; public class OrderedPartitionedKVOutputConfig { /** + * Currently supported sorter implementations + */ + public enum SorterImpl { + /** Legacy sorter implementation based on Hadoop MapReduce shuffle impl. + * Restricted to 2 GB memory limits. + */ + LEGACY, + /** Pipeline sorter - a more efficient sorter that supports > 2 GB sort buffers */ + PIPELINED + } + + + /** * Configure parameters which are specific to the Output. */ @InterfaceAudience.Private @@ -85,8 +98,6 @@ public class OrderedPartitionedKVOutputConfig { */ public T setCombiner(String combinerClassName, @Nullable Map combinerConf); - - /** * Configure the number of threads to be used by the sorter * @@ -94,6 +105,15 @@ public class OrderedPartitionedKVOutputConfig { * @return instance of the current builder */ public T setSorterNumThreads(int numThreads); + + /** + * Configure which sorter implementation to be used + * + * @param sorterImpl Use an in-built sorter implementations. + * @return instance of the current builder + */ + public T setSorter(SorterImpl sorterImpl); + } @SuppressWarnings("rawtypes") @@ -133,6 +153,13 @@ public class OrderedPartitionedKVOutputConfig { } @Override + public SpecificBuilder setSorter(SorterImpl sorterImpl) { + builder.setSorter(sorterImpl); + return this; + } + + + @Override public SpecificBuilder setAdditionalConfiguration(String key, String value) { builder.setAdditionalConfiguration(key, value); return this; @@ -294,10 +321,19 @@ public class OrderedPartitionedKVOutputConfig { @Override public Builder setSorterNumThreads(int numThreads) { - this.conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, numThreads); + this.conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS, numThreads); + return this; + } + + @Override + public Builder setSorter(SorterImpl sorterImpl) { + Preconditions.checkNotNull(sorterImpl, "Sorter cannot be null"); + this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, + sorterImpl.name()); return this; } + @Override public Builder setAdditionalConfiguration(String key, String value) { Preconditions.checkNotNull(key, "Key cannot be null"); http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index b03c674..df6daf2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -18,15 +18,18 @@ package org.apache.tez.runtime.library.output; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -102,8 +105,17 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { public synchronized void start() throws Exception { if (!isStarted.get()) { memoryUpdateCallbackHandler.validateUpdateReceived(); - int sortThreads = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, - TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS_DEFAULT); + String sorterClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, + TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS_DEFAULT).toUpperCase(Locale.ENGLISH); + SorterImpl sorterImpl = null; + try { + sorterImpl = SorterImpl.valueOf(sorterClass); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid sorter class specified in config" + + ", propertyName=" + TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS + + ", value=" + sorterClass + + ", validValues=" + Arrays.asList(SorterImpl.values())); + } finalMergeEnabled = conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, @@ -121,17 +133,22 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); } - //TODO: Enable it for pipelinedsorter only and not for DefaultSorter - Preconditions.checkArgument((sortThreads > 1), TezRuntimeConfiguration - .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " works with PipelinedSorter."); + Preconditions.checkArgument(sorterImpl.equals(SorterImpl.PIPELINED), + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + + "only works with PipelinedSorter."); } - if (sortThreads > 1) { + if (sorterImpl.equals(SorterImpl.PIPELINED)) { sorter = new PipelinedSorter(getContext(), conf, getNumPhysicalOutputs(), memoryUpdateCallbackHandler.getMemoryAssigned()); - } else { + } else if (sorterImpl.equals(SorterImpl.LEGACY)) { sorter = new DefaultSorter(getContext(), conf, getNumPhysicalOutputs(), memoryUpdateCallbackHandler.getMemoryAssigned()); + } else { + throw new UnsupportedOperationException("Unsupported sorter class specified in config" + + ", propertyName=" + TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS + + ", value=" + sorterClass + + ", validValues=" + Arrays.asList(SorterImpl.values())); } isStarted.set(true); @@ -202,7 +219,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS); - confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS); @@ -219,6 +236,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/api/TestTezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/api/TestTezRuntimeConfiguration.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/api/TestTezRuntimeConfiguration.java index fcaebd9..8e97b12 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/api/TestTezRuntimeConfiguration.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/api/TestTezRuntimeConfiguration.java @@ -27,9 +27,9 @@ import java.lang.reflect.Field; import java.util.HashSet; import java.util.Set; -import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.junit.Test; + public class TestTezRuntimeConfiguration { http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index f68032f..2e264f6 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -103,7 +103,6 @@ public class TestShuffleUtils { public void setup() throws Exception { outputContext = createTezOutputContext(); conf = new Configuration(); - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 1); // DefaultSorter conf.set("fs.defaultFS", "file:///"); localFs = FileSystem.getLocal(conf); http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 073d956..6e56567 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -19,6 +19,7 @@ import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.junit.After; import org.junit.Assert; @@ -89,7 +90,7 @@ public class TestPipelinedSorter { this.outputContext = createMockOutputContext(counters, appId, uniqueId); //To enable PipelinedSorter, set 2 threads - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index 408ec3b..70dce13 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -54,6 +54,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.junit.After; @@ -76,7 +77,7 @@ public class TestDefaultSorter { @Before public void setup() throws IOException { conf = new Configuration(); - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 1); // DefaultSorter + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.LEGACY.name()); // DefaultSorter conf.set("fs.defaultFS", "file:///"); localFs = FileSystem.getLocal(conf); http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVOutputConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVOutputConfig.java index ac31cf5..891e807 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVOutputConfig.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVOutputConfig.java @@ -31,6 +31,7 @@ import java.util.Map; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; + import org.junit.Test; public class TestOrderedPartitionedKVOutputConfig { http://git-wip-us.apache.org/repos/asf/tez/blob/db0a50c5/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index 8e43f21..30f78fe 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -39,10 +39,12 @@ import org.apache.tez.runtime.library.api.KeyValuesWriter; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -84,6 +86,7 @@ public class TestOnFileSortedOutput { //no of outputs private int partitions; //For sorter (pipelined / Default) + private SorterImpl sorterImpl; private int sorterThreads; final AtomicLong outputSize = new AtomicLong(); @@ -98,14 +101,16 @@ public class TestOnFileSortedOutput { * Constructor * * @param sendEmptyPartitionViaEvent - * @param threads number of threads needed for sorter (pipelinedsorter or default sorter) + * @param sorterImpl Which sorter impl ( pipeline/legacy ) + * @param sorterThreads number of threads needed for sorter (required only for pipelined sorter) * @param emptyPartitionIdx for which data should not be generated */ - public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent, int threads, - int emptyPartitionIdx) throws IOException { + public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent, SorterImpl sorterImpl, + int sorterThreads, int emptyPartitionIdx) throws IOException { this.sendEmptyPartitionViaEvent = sendEmptyPartitionViaEvent; this.emptyPartitionIdx = emptyPartitionIdx; - this.sorterThreads = threads; + this.sorterImpl = sorterImpl; + this.sorterThreads = sorterThreads; conf = new Configuration(); @@ -117,7 +122,8 @@ public class TestOnFileSortedOutput { @Before public void setup() throws Exception { - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, sorterThreads); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, sorterImpl.name()); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS, sorterThreads); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); @@ -138,20 +144,20 @@ public class TestOnFileSortedOutput { fs.delete(workingDir, true); } - @Parameterized.Parameters(name = "test[{0}, {1}, {2}]") + @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3}]") public static Collection getParameters() { Collection parameters = new ArrayList(); //empty_partition_via_events_enabled, noOfSortThreads, partitionToBeEmpty - parameters.add(new Object[] { false, 1, -1 }); - parameters.add(new Object[] { false, 1, 0 }); - parameters.add(new Object[] { true, 1, -1 }); - parameters.add(new Object[] { true, 1, 0 }); + parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, -1 }); + parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, 0 }); + parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, -1 }); + parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0 }); //Pipelined sorter - parameters.add(new Object[] { false, 2, -1 }); - parameters.add(new Object[] { false, 2, 0 }); - parameters.add(new Object[] { true, 2, -1 }); - parameters.add(new Object[] { true, 2, 0 }); + parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, -1 }); + parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, 0 }); + parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, -1 }); + parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0 }); return parameters; } @@ -167,11 +173,10 @@ public class TestOnFileSortedOutput { writer = sortedOutput.getWriter(); } - @Test (timeout = 5000) - public void testPipelinedShuffle() throws Exception { + private void _testPipelinedShuffle(String sorterName) throws Exception { conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3); - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, sorterName); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); OutputContext context = createTezOutputContext(); @@ -184,14 +189,18 @@ public class TestOnFileSortedOutput { assertFalse(sortedOutput.finalMergeEnabled); assertTrue(sortedOutput.pipelinedShuffle); + } + @Test (timeout = 5000) + public void testPipelinedShuffle() throws Exception { + _testPipelinedShuffle(SorterImpl.PIPELINED.name()); } @Test (timeout = 5000) public void testPipelinedShuffleWithFinalMerge() throws Exception { conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name()); - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 2); //wrong setting for final merge enable in output conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); @@ -209,8 +218,8 @@ public class TestOnFileSortedOutput { @Test public void testPipelinedSettingsWithDefaultSorter() throws Exception { conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3); - //negative. with sort threads-1 - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, 1); + //negative. with default sorter + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.LEGACY.name()); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true); @@ -376,4 +385,20 @@ public class TestOnFileSortedOutput { return context; } + @Test(timeout=5000) + public void testInvalidSorter() throws Exception { + try { + _testPipelinedShuffle("Foo"); + Assert.fail("Expected start to fail due to invalid sorter"); + } catch (IllegalArgumentException e) { + // Expected + } + } + + @Test(timeout=5000) + public void testLowerCaseNamedSorter() throws Exception { + _testPipelinedShuffle("Pipelined"); + } + + }