Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D32AA11056 for ; Mon, 7 Jul 2014 21:52:19 +0000 (UTC) Received: (qmail 27890 invoked by uid 500); 7 Jul 2014 21:52:19 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 27853 invoked by uid 500); 7 Jul 2014 21:52:19 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 27844 invoked by uid 99); 7 Jul 2014 21:52:19 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Jul 2014 21:52:19 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 07 Jul 2014 21:52:13 +0000 Received: (qmail 26190 invoked by uid 99); 7 Jul 2014 21:51:46 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Jul 2014 21:51:46 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 53EC19451D7; Mon, 7 Jul 2014 21:51:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Date: Mon, 07 Jul 2014 21:51:46 -0000 Message-Id: <084d626098324ef0b52219e0318e3f53@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] TEZ-1080. Add specific Configuration APIs for non MR based Inputs / Outputs. (sseth) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-tez Updated Branches: refs/heads/master a5c320931 -> 3535b142f http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedEdgeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedEdgeConfiguration.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedEdgeConfiguration.java new file mode 100644 index 0000000..79036d3 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedEdgeConfiguration.java @@ -0,0 +1,266 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.tez.runtime.library.conf; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezJobConfig; +import org.junit.Test; + +public class TestOrderedPartitionedEdgeConfiguration { + + @Test + public void testIncompleteParameters() { + OrderedPartitionedKVEdgeConfiguration.Builder builder = + OrderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE"); + try { + builder.build(); + fail("Should have failed since the partitioner has not been specified"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("Output must be configured - partitioner")); + } + } + + @Test + public void testNullParams() { + try { + OrderedPartitionedKVEdgeConfiguration.newBuilder(null, "VALUE"); + fail("Expecting a null parameter list to fail"); + } catch (NullPointerException npe) { + assertTrue(npe.getMessage().contains("cannot be null")); + } + + try { + OrderedPartitionedKVEdgeConfiguration.newBuilder("KEY", null); + fail("Expecting a null parameter list to fail"); + } catch (NullPointerException npe) { + assertTrue(npe.getMessage().contains("cannot be null")); + } + + try { + OrderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE").configureOutput(null, null); + fail("Expecting a null parameter list to fail"); + } catch (NullPointerException npe) { + assertTrue(npe.getMessage().contains("cannot be null")); + } + } + + @Test + public void testDefaultConfigsUsed() { + OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration + .newBuilder("KEY", "VALUE") + .configureOutput("PARTITIONER", null).done(); + + OrderedPartitionedKVEdgeConfiguration configuration = builder.build(); + + byte[] outputBytes = configuration.getOutputPayload(); + byte[] inputBytes = configuration.getInputPayload(); + + OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration(); + rebuiltOutput.fromByteArray(outputBytes); + ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration(); + rebuiltInput.fromByteArray(inputBytes); + + Configuration outputConf = rebuiltOutput.conf; + assertEquals(true, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, + TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); + assertEquals("TestCodec", + outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "")); + + Configuration inputConf = rebuiltInput.conf; + assertEquals(true, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, + TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); + assertEquals("TestCodec", + inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "")); + } + + @Test + public void testSpecificIOConfs() { + // Ensures that Output and Input confs are not mixed. + OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration + .newBuilder("KEY", "VALUE") + .configureOutput("PARTITIONER", null).done(); + + OrderedPartitionedKVEdgeConfiguration configuration = builder.build(); + + byte[] outputBytes = configuration.getOutputPayload(); + byte[] inputBytes = configuration.getInputPayload(); + + OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration(); + rebuiltOutput.fromByteArray(outputBytes); + ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration(); + rebuiltInput.fromByteArray(inputBytes); + + Configuration outputConf = rebuiltOutput.conf; + assertEquals("DEFAULT", + outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "DEFAULT")); + + Configuration inputConf = rebuiltInput.conf; + assertEquals("DEFAULT", + inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT")); + } + + @Test + public void tetCommonConf() { + + Configuration fromConf = new Configuration(false); + fromConf.set("test.conf.key.1", "confkey1"); + fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 3); + fromConf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.11f); + fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 123); + fromConf.set("io.shouldExist", "io"); + Map additionalConfs = new HashMap(); + additionalConfs.put("test.key.2", "key2"); + additionalConfs.put(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111"); + additionalConfs.put(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f"); + additionalConfs.put(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "CustomSorter"); + additionalConfs.put("file.shouldExist", "file"); + + OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration + .newBuilder("KEY", "VALUE") + .configureOutput("PARTITIONER", null).done() + .setAdditionalConfiguration("fs.shouldExist", "fs") + .setAdditionalConfiguration("test.key.1", "key1") + .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, "2222") + .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, "0.33f") + .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, "3333") + .setAdditionalConfiguration(additionalConfs) + .setFromConfiguration(fromConf); + + OrderedPartitionedKVEdgeConfiguration configuration = builder.build(); + + byte[] outputBytes = configuration.getOutputPayload(); + byte[] inputBytes = configuration.getInputPayload(); + + OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration(); + rebuiltOutput.fromByteArray(outputBytes); + ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration(); + rebuiltInput.fromByteArray(inputBytes); + + Configuration outputConf = rebuiltOutput.conf; + Configuration inputConf = rebuiltInput.conf; + + assertEquals(3, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 0)); + assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0)); + assertEquals(2222, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT)); + assertEquals(123, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 0)); + assertEquals("CustomSorter", outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS)); + assertEquals(3333, + outputConf.getInt(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 0)); + assertEquals("io", outputConf.get("io.shouldExist")); + assertEquals("file", outputConf.get("file.shouldExist")); + assertEquals("fs", outputConf.get("fs.shouldExist")); + + + assertEquals(3, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 0)); + assertEquals(1111, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0)); + assertEquals(2222, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0)); + assertEquals(0.11f, + inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.0f), 0.001f); + assertEquals(0.22f, + inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f); + assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f), + 0.001f); + assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB)); + assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS)); + assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES)); + assertEquals("io", inputConf.get("io.shouldExist")); + assertEquals("file", inputConf.get("file.shouldExist")); + assertEquals("fs", inputConf.get("fs.shouldExist")); + + } + + @Test + public void testSetters() { + OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration + .newBuilder("KEY", "VALUE") + .setKeyComparatorClass("KEY_COMPARATOR") + .configureOutput("PARTITIONER", null).setSortBufferSize(1111).setSorterNumThreads(2).done() + .configureInput().setMaxSingleMemorySegmentFraction(0.11f).setMergeFraction(0.22f) + .setPostMergeBufferFraction(0.33f).setShuffleBufferFraction(0.44f).done() + .enableCompression("CustomCodec"); + + OrderedPartitionedKVEdgeConfiguration configuration = builder.build(); + + byte[] outputBytes = configuration.getOutputPayload(); + byte[] inputBytes = configuration.getInputPayload(); + + OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration(); + rebuiltOutput.fromByteArray(outputBytes); + ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration(); + rebuiltInput.fromByteArray(inputBytes); + + Configuration outputConf = rebuiltOutput.conf; + Configuration inputConf = rebuiltInput.conf; + + assertEquals("KEY", outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, "")); + assertEquals("VALUE", + outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, "")); + assertEquals("PARTITIONER", outputConf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, "")); + assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 0)); + assertEquals("CustomCodec", + outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "")); + assertEquals(true, + outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, + false)); + assertEquals("KEY_COMPARATOR", + outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT)); + + + assertEquals("KEY", inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, "")); + assertEquals("VALUE", + inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, "")); + assertEquals("CustomCodec", + inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "")); + assertEquals(true, + inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED, + false)); + assertEquals(0.11f, + inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f); + assertEquals(0.22f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f), + 0.001f); + assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, 0.0f), + 0.001f); + assertEquals(0.44f, + inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.00f), 0.001f); + assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB)); + assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC)); + assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS)); + assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfiguration.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfiguration.java new file mode 100644 index 0000000..abb5199 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfiguration.java @@ -0,0 +1,165 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.tez.runtime.library.conf; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezJobConfig; +import org.junit.Test; + +public class TestShuffledMergedInputConfiguration { + + @Test + public void testNullParams() { + try { + ShuffledMergedInputConfiguration.newBuilder(null, "VALUE"); + fail("Expecting a null parameter list to fail"); + } catch (NullPointerException npe) { + assertTrue(npe.getMessage().contains("cannot be null")); + } + + try { + ShuffledMergedInputConfiguration.newBuilder("KEY", null); + fail("Expecting a null parameter list to fail"); + } catch (NullPointerException npe) { + assertTrue(npe.getMessage().contains("cannot be null")); + } + } + + @Test + public void testSetters() { + Configuration fromConf = new Configuration(false); + fromConf.set("test.conf.key.1", "confkey1"); + fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111); + fromConf.set("io.shouldExist", "io"); + Map additionalConf = new HashMap(); + additionalConf.put("test.key.2", "key2"); + additionalConf.put(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, "3"); + additionalConf.put("file.shouldExist", "file"); + ShuffledMergedInputConfiguration.Builder builder = + ShuffledMergedInputConfiguration.newBuilder("KEY", "VALUE") + .setKeyComparatorClass("KEY_COMPARATOR") + .enableCompression("CustomCodec") + .setMaxSingleMemorySegmentFraction(0.11f) + .setMergeFraction(0.22f) + .setPostMergeBufferFraction(0.33f) + .setShuffleBufferFraction(0.44f) + .setAdditionalConfiguration("fs.shouldExist", "fs") + .setAdditionalConfiguration("test.key.1", "key1") + .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, + String.valueOf(false)) + .setAdditionalConfiguration(additionalConf) + .setFromConfiguration(fromConf); + + ShuffledMergedInputConfiguration configuration = builder.build(); + + + byte[] confBytes = configuration.toByteArray(); + ShuffledMergedInputConfiguration rebuilt = new ShuffledMergedInputConfiguration(); + rebuilt.fromByteArray(confBytes); + + Configuration conf = rebuilt.conf; + + // Verify programmatic API usage + assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, "")); + assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, "")); + assertEquals("CustomCodec", + conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "")); + assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED, + false)); + assertEquals(0.11f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f); + assertEquals(0.22f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f), 0.001f); + assertEquals(0.33f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, 0.0f), 0.001f); + assertEquals(0.44f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.00f), 0.001f); + + // Verify additional configs + assertEquals(false, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, + TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); + assertEquals(1111, conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, + TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT)); + assertEquals(3, conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, -1)); + assertEquals("io", conf.get("io.shouldExist")); + assertEquals("file", conf.get("file.shouldExist")); + assertEquals("fs", conf.get("fs.shouldExist")); + assertNull(conf.get("test.conf.key.1")); + assertNull(conf.get("test.key.1")); + assertNull(conf.get("test.key.2")); + } + + @Test + public void testDefaultConfigsUsed() { + ShuffledMergedInputConfiguration.Builder builder = + ShuffledMergedInputConfiguration.newBuilder("KEY", "VALUE"); + ShuffledMergedInputConfiguration configuration = builder.build(); + + byte[] confBytes = configuration.toByteArray(); + ShuffledMergedInputConfiguration rebuilt = new ShuffledMergedInputConfiguration(); + rebuilt.fromByteArray(confBytes); + + Configuration conf = rebuilt.conf; + + assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, + TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); + + // Default Output property present. + assertEquals("TestCodec", + conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "")); + // Input property should be absent + assertEquals("DEFAULT", + conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT")); + + // Verify whatever was configured + assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, "")); + assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, "")); + } + + @Test + public void testCombinerConfigs() { + Configuration combinerConf = new Configuration(false); + combinerConf.set("combiner.test.key", "COMBINERKEY"); + combinerConf + .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "InvalidKeyOverride"); + ShuffledMergedInputConfiguration.Builder builder = + ShuffledMergedInputConfiguration.newBuilder("KEY", "VALUE") + .setCombiner("COMBINER", combinerConf); + + ShuffledMergedInputConfiguration configuration = builder.build(); + + byte[] confBytes = configuration.toByteArray(); + ShuffledMergedInputConfiguration rebuilt = new ShuffledMergedInputConfiguration(); + rebuilt.fromByteArray(confBytes); + + Configuration conf = rebuilt.conf; + + // Default Output property should not be overridden based on partitioner config + assertEquals("TestCodec", + conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "")); + + assertEquals("COMBINERKEY", conf.get("combiner.test.key")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfiguration.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfiguration.java new file mode 100644 index 0000000..d4de270 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfiguration.java @@ -0,0 +1,137 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.tez.runtime.library.conf; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezJobConfig; +import org.junit.Test; + +public class TestShuffledUnorderedKVInputConfiguration { + + @Test + public void testNullParams() { + try { + ShuffledUnorderedKVInputConfiguration.newBuilder(null, "VALUE"); + fail("Expecting a null parameter list to fail"); + } catch (NullPointerException npe) { + assertTrue(npe.getMessage().contains("cannot be null")); + } + + try { + ShuffledUnorderedKVInputConfiguration.newBuilder("KEY", null); + fail("Expecting a null parameter list to fail"); + } catch (NullPointerException npe) { + assertTrue(npe.getMessage().contains("cannot be null")); + } + } + + @Test + public void testSetters() { + Configuration fromConf = new Configuration(false); + fromConf.set("test.conf.key.1", "confkey1"); + fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111); + fromConf.set("io.shouldExist", "io"); + Map additionalConf = new HashMap(); + additionalConf.put("test.key.2", "key2"); + additionalConf.put(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, "3"); + additionalConf.put("file.shouldExist", "file"); + ShuffledUnorderedKVInputConfiguration.Builder builder = + ShuffledUnorderedKVInputConfiguration.newBuilder("KEY", "VALUE") + .enableCompression("CustomCodec") + .setMaxSingleMemorySegmentFraction(0.11f) + .setMergeFraction(0.22f) + .setShuffleBufferFraction(0.33f) + .setAdditionalConfiguration("fs.shouldExist", "fs") + .setAdditionalConfiguration("test.key.1", "key1") + .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, + String.valueOf(false)) + .setAdditionalConfiguration(additionalConf) + .setFromConfiguration(fromConf); + + ShuffledUnorderedKVInputConfiguration configuration = builder.build(); + + + byte[] confBytes = configuration.toByteArray(); + ShuffledUnorderedKVInputConfiguration rebuilt = new ShuffledUnorderedKVInputConfiguration(); + rebuilt.fromByteArray(confBytes); + + Configuration conf = rebuilt.conf; + + // Verify programmatic API usage + assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, "")); + assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, "")); + assertEquals("CustomCodec", + conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "")); + assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED, + false)); + assertEquals(0.11f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f); + assertEquals(0.22f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f), 0.001f); + assertEquals(0.33f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.00f), 0.001f); + + // Verify additional configs + assertEquals(false, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, + TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); + assertEquals(1111, conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, + TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT)); + assertEquals(3, conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, -1)); + assertEquals("io", conf.get("io.shouldExist")); + assertEquals("file", conf.get("file.shouldExist")); + assertEquals("fs", conf.get("fs.shouldExist")); + assertNull(conf.get("test.conf.key.1")); + assertNull(conf.get("test.key.1")); + assertNull(conf.get("test.key.2")); + } + + @Test + public void testDefaultConfigsUsed() { + ShuffledUnorderedKVInputConfiguration.Builder builder = + ShuffledUnorderedKVInputConfiguration.newBuilder("KEY", "VALUE"); + ShuffledUnorderedKVInputConfiguration configuration = builder.build(); + + byte[] confBytes = configuration.toByteArray(); + ShuffledUnorderedKVInputConfiguration rebuilt = new ShuffledUnorderedKVInputConfiguration(); + rebuilt.fromByteArray(confBytes); + + Configuration conf = rebuilt.conf; + + assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, + TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); + + // Default Output property present. + assertEquals("TestCodec", + conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "")); + // Input property should be absent + assertEquals("DEFAULT", + conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT")); + + // Verify whatever was configured + assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, "")); + assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, "")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedEdgeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedEdgeConfiguration.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedEdgeConfiguration.java new file mode 100644 index 0000000..23acf04 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedEdgeConfiguration.java @@ -0,0 +1,205 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.tez.runtime.library.conf; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezJobConfig; +import org.junit.Test; + +public class TestUnorderedPartitionedEdgeConfiguration { + @Test + public void testIncompleteParameters() { + UnorderedPartitionedKVEdgeConfiguration.Builder builder = + UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE"); + try { + builder.build(); + fail("Should have failed since the partitioner has not been specified"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("Output must be configured - partitioner")); + } + } + + @Test + public void testNullParams() { + try { + UnorderedPartitionedKVEdgeConfiguration.newBuilder(null, "VALUE"); + fail("Expecting a null parameter list to fail"); + } catch (NullPointerException npe) { + assertTrue(npe.getMessage().contains("cannot be null")); + } + + try { + UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", null); + fail("Expecting a null parameter list to fail"); + } catch (NullPointerException npe) { + assertTrue(npe.getMessage().contains("cannot be null")); + } + + try { + UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE").configureOutput(null, null); + fail("Expecting a null parameter list to fail"); + } catch (NullPointerException npe) { + assertTrue(npe.getMessage().contains("cannot be null")); + } + } + + @Test + public void testDefaultConfigsUsed() { + UnorderedPartitionedKVEdgeConfiguration.Builder builder = + UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE") + .configureOutput("PARTITIONER", null).done(); + + UnorderedPartitionedKVEdgeConfiguration configuration = builder.build(); + + byte[] outputBytes = configuration.getOutputPayload(); + byte[] inputBytes = configuration.getInputPayload(); + + OnFileUnorderedPartitionedKVOutputConfiguration rebuiltOutput = + new OnFileUnorderedPartitionedKVOutputConfiguration(); + rebuiltOutput.fromByteArray(outputBytes); + ShuffledUnorderedKVInputConfiguration rebuiltInput = + new ShuffledUnorderedKVInputConfiguration(); + rebuiltInput.fromByteArray(inputBytes); + + Configuration outputConf = rebuiltOutput.conf; + assertEquals(true, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, + TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); + assertEquals("TestCodec", + outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "")); + + Configuration inputConf = rebuiltInput.conf; + assertEquals(true, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, + TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); + assertEquals("TestCodec", + inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "")); + } + + @Test + public void testSpecificIOConfs() { + // Ensures that Output and Input confs are not mixed. + UnorderedPartitionedKVEdgeConfiguration.Builder builder = + UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE") + .configureOutput("PARTITIONER", null).done(); + + UnorderedPartitionedKVEdgeConfiguration configuration = builder.build(); + + byte[] outputBytes = configuration.getOutputPayload(); + byte[] inputBytes = configuration.getInputPayload(); + + OnFileUnorderedPartitionedKVOutputConfiguration rebuiltOutput = + new OnFileUnorderedPartitionedKVOutputConfiguration(); + rebuiltOutput.fromByteArray(outputBytes); + ShuffledUnorderedKVInputConfiguration rebuiltInput = + new ShuffledUnorderedKVInputConfiguration(); + rebuiltInput.fromByteArray(inputBytes); + + Configuration outputConf = rebuiltOutput.conf; + assertEquals("DEFAULT", + outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "DEFAULT")); + + Configuration inputConf = rebuiltInput.conf; + assertEquals("DEFAULT", + inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT")); + } + + @Test + public void tetCommonConf() { + + Configuration fromConf = new Configuration(false); + fromConf.set("test.conf.key.1", "confkey1"); + fromConf.setBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, false); + fromConf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.11f); + fromConf.setInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 123); + fromConf.set("io.shouldExist", "io"); + Map additionalConfs = new HashMap(); + additionalConfs.put("test.key.2", "key2"); + additionalConfs.put(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111"); + additionalConfs.put(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f"); + additionalConfs + .put(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, "2222"); + additionalConfs.put("file.shouldExist", "file"); + + UnorderedPartitionedKVEdgeConfiguration.Builder builder = UnorderedPartitionedKVEdgeConfiguration + .newBuilder("KEY", + "VALUE") + .configureOutput("PARTITIONER", null).done() + .setAdditionalConfiguration("fs.shouldExist", "fs") + .setAdditionalConfiguration("test.key.1", "key1") + .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, "3333") + .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, "0.33f") + .setAdditionalConfiguration(additionalConfs) + .setFromConfiguration(fromConf); + + UnorderedPartitionedKVEdgeConfiguration configuration = builder.build(); + + byte[] outputBytes = configuration.getOutputPayload(); + byte[] inputBytes = configuration.getInputPayload(); + + OnFileUnorderedPartitionedKVOutputConfiguration rebuiltOutput = + new OnFileUnorderedPartitionedKVOutputConfiguration(); + rebuiltOutput.fromByteArray(outputBytes); + ShuffledUnorderedKVInputConfiguration rebuiltInput = + new ShuffledUnorderedKVInputConfiguration(); + rebuiltInput.fromByteArray(inputBytes); + + Configuration outputConf = rebuiltOutput.conf; + Configuration inputConf = rebuiltInput.conf; + + assertEquals(false, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, true)); + assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0)); + assertEquals(3333, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT)); + assertEquals(123, + outputConf.getInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 0)); + assertEquals(2222, + outputConf.getInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, 0)); + assertEquals("io", outputConf.get("io.shouldExist")); + assertEquals("file", outputConf.get("file.shouldExist")); + assertEquals("fs", outputConf.get("fs.shouldExist")); + + + assertEquals(false, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, true)); + assertEquals(1111, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0)); + assertEquals(3333, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0)); + assertEquals(0.11f, + inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.0f), 0.001f); + assertEquals(0.22f, + inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f); + assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f), + 0.001f); + assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB)); + assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES)); + assertEquals("io", inputConf.get("io.shouldExist")); + assertEquals("file", inputConf.get("file.shouldExist")); + assertEquals("fs", inputConf.get("fs.shouldExist")); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedEdgeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedEdgeConfiguration.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedEdgeConfiguration.java new file mode 100644 index 0000000..1fbe653 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedEdgeConfiguration.java @@ -0,0 +1,175 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.tez.runtime.library.conf; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezJobConfig; +import org.junit.Test; + +public class TestUnorderedUnpartitionedEdgeConfiguration { + + @Test + public void testNullParams() { + try { + UnorderedUnpartitionedKVEdgeConfiguration.newBuilder(null, "VALUE"); + fail("Expecting a null parameter list to fail"); + } catch (NullPointerException npe) { + assertTrue(npe.getMessage().contains("cannot be null")); + } + + try { + UnorderedUnpartitionedKVEdgeConfiguration.newBuilder("KEY", null); + fail("Expecting a null parameter list to fail"); + } catch (NullPointerException npe) { + assertTrue(npe.getMessage().contains("cannot be null")); + } + } + + @Test + public void testDefaultConfigsUsed() { + UnorderedUnpartitionedKVEdgeConfiguration.Builder builder = + UnorderedUnpartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE"); + + UnorderedUnpartitionedKVEdgeConfiguration configuration = builder.build(); + + byte[] outputBytes = configuration.getOutputPayload(); + byte[] inputBytes = configuration.getInputPayload(); + + OnFileUnorderedKVOutputConfiguration rebuiltOutput = + new OnFileUnorderedKVOutputConfiguration(); + rebuiltOutput.fromByteArray(outputBytes); + ShuffledUnorderedKVInputConfiguration rebuiltInput = + new ShuffledUnorderedKVInputConfiguration(); + rebuiltInput.fromByteArray(inputBytes); + + Configuration outputConf = rebuiltOutput.conf; + assertEquals(true, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, + TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); + assertEquals("TestCodec", + outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "")); + + Configuration inputConf = rebuiltInput.conf; + assertEquals(true, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, + TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); + assertEquals("TestCodec", + inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "")); + } + + @Test + public void testSpecificIOConfs() { + // Ensures that Output and Input confs are not mixed. + UnorderedUnpartitionedKVEdgeConfiguration.Builder builder = + UnorderedUnpartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE"); + + UnorderedUnpartitionedKVEdgeConfiguration configuration = builder.build(); + + byte[] outputBytes = configuration.getOutputPayload(); + byte[] inputBytes = configuration.getInputPayload(); + + OnFileUnorderedKVOutputConfiguration rebuiltOutput = + new OnFileUnorderedKVOutputConfiguration(); + rebuiltOutput.fromByteArray(outputBytes); + ShuffledUnorderedKVInputConfiguration rebuiltInput = + new ShuffledUnorderedKVInputConfiguration(); + rebuiltInput.fromByteArray(inputBytes); + + Configuration outputConf = rebuiltOutput.conf; + assertEquals("DEFAULT", + outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "DEFAULT")); + + Configuration inputConf = rebuiltInput.conf; + assertEquals("DEFAULT", + inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT")); + } + + @Test + public void tetCommonConf() { + + Configuration fromConf = new Configuration(false); + fromConf.set("test.conf.key.1", "confkey1"); + fromConf.setBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, false); + fromConf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.11f); + fromConf.set("io.shouldExist", "io"); + Map additionalConfs = new HashMap(); + additionalConfs.put("test.key.2", "key2"); + additionalConfs.put(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111"); + additionalConfs.put(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f"); + additionalConfs.put("file.shouldExist", "file"); + + UnorderedUnpartitionedKVEdgeConfiguration.Builder builder = UnorderedUnpartitionedKVEdgeConfiguration + .newBuilder("KEY", + "VALUE") + .setAdditionalConfiguration("fs.shouldExist", "fs") + .setAdditionalConfiguration("test.key.1", "key1") + .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, "3333") + .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, "0.33f") + .setAdditionalConfiguration(additionalConfs) + .setFromConfiguration(fromConf); + + UnorderedUnpartitionedKVEdgeConfiguration configuration = builder.build(); + + byte[] outputBytes = configuration.getOutputPayload(); + byte[] inputBytes = configuration.getInputPayload(); + + OnFileUnorderedKVOutputConfiguration rebuiltOutput = + new OnFileUnorderedKVOutputConfiguration(); + rebuiltOutput.fromByteArray(outputBytes); + ShuffledUnorderedKVInputConfiguration rebuiltInput = + new ShuffledUnorderedKVInputConfiguration(); + rebuiltInput.fromByteArray(inputBytes); + + Configuration outputConf = rebuiltOutput.conf; + Configuration inputConf = rebuiltInput.conf; + + assertEquals(false, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, true)); + assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0)); + assertEquals(3333, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT)); + assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT)); + assertEquals("io", outputConf.get("io.shouldExist")); + assertEquals("file", outputConf.get("file.shouldExist")); + assertEquals("fs", outputConf.get("fs.shouldExist")); + + + assertEquals(false, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, true)); + assertEquals(1111, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0)); + assertEquals(3333, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0)); + assertEquals(0.11f, + inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.0f), 0.001f); + assertEquals(0.22f, + inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f); + assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f), + 0.001f); + assertEquals("io", inputConf.get("io.shouldExist")); + assertEquals("file", inputConf.get("file.shouldExist")); + assertEquals("fs", inputConf.get("fs.shouldExist")); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/resources/tez-site.xml ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/resources/tez-site.xml b/tez-runtime-library/src/test/resources/tez-site.xml new file mode 100644 index 0000000..399bfab --- /dev/null +++ b/tez-runtime-library/src/test/resources/tez-site.xml @@ -0,0 +1,35 @@ + + + + + + + tez.runtime.ifile.readahead + true + + + + tez.runtime.intermediate-output.compress.codec + TestCodec + + + + tez.runtime.intermediate-input.compress.codec + TestCodec + +