tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/3] TEZ-1080. Add specific Configuration APIs for non MR based Inputs / Outputs. (sseth)
Date Mon, 07 Jul 2014 21:51:46 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master a5c320931 -> 3535b142f


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedEdgeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedEdgeConfiguration.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedEdgeConfiguration.java
new file mode 100644
index 0000000..79036d3
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedEdgeConfiguration.java
@@ -0,0 +1,266 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.junit.Test;
+
+public class TestOrderedPartitionedEdgeConfiguration {
+
+  @Test
+  public void testIncompleteParameters() {
+    OrderedPartitionedKVEdgeConfiguration.Builder builder =
+        OrderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE");
+    try {
+      builder.build();
+      fail("Should have failed since the partitioner has not been specified");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Output must be configured - partitioner"));
+    }
+  }
+
+  @Test
+  public void testNullParams() {
+    try {
+      OrderedPartitionedKVEdgeConfiguration.newBuilder(null, "VALUE");
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      OrderedPartitionedKVEdgeConfiguration.newBuilder("KEY", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      OrderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE").configureOutput(null,
null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+  }
+
+  @Test
+  public void testDefaultConfigsUsed() {
+    OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder("KEY", "VALUE")
+        .configureOutput("PARTITIONER", null).done();
+
+    OrderedPartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    assertEquals(true, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals("TestCodec",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+
+    Configuration inputConf = rebuiltInput.conf;
+    assertEquals(true, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals("TestCodec",
+        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
+  }
+
+  @Test
+  public void testSpecificIOConfs() {
+    // Ensures that Output and Input confs are not mixed.
+    OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder("KEY", "VALUE")
+        .configureOutput("PARTITIONER", null).done();
+
+    OrderedPartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    assertEquals("DEFAULT",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "DEFAULT"));
+
+    Configuration inputConf = rebuiltInput.conf;
+    assertEquals("DEFAULT",
+        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT"));
+  }
+
+  @Test
+  public void tetCommonConf() {
+
+    Configuration fromConf = new Configuration(false);
+    fromConf.set("test.conf.key.1", "confkey1");
+    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 3);
+    fromConf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.11f);
+    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 123);
+    fromConf.set("io.shouldExist", "io");
+    Map<String, String> additionalConfs = new HashMap<String, String>();
+    additionalConfs.put("test.key.2", "key2");
+    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
+    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
+    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "CustomSorter");
+    additionalConfs.put("file.shouldExist", "file");
+
+    OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder("KEY", "VALUE")
+        .configureOutput("PARTITIONER", null).done()
+        .setAdditionalConfiguration("fs.shouldExist", "fs")
+        .setAdditionalConfiguration("test.key.1", "key1")
+        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, "2222")
+        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, "0.33f")
+        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
"3333")
+        .setAdditionalConfiguration(additionalConfs)
+        .setFromConfiguration(fromConf);
+
+    OrderedPartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    Configuration inputConf = rebuiltInput.conf;
+
+    assertEquals(3, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
+    assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
0));
+    assertEquals(2222, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT));
+    assertEquals(123, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 0));
+    assertEquals("CustomSorter", outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS));
+    assertEquals(3333,
+        outputConf.getInt(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 0));
+    assertEquals("io", outputConf.get("io.shouldExist"));
+    assertEquals("file", outputConf.get("file.shouldExist"));
+    assertEquals("fs", outputConf.get("fs.shouldExist"));
+
+
+    assertEquals(3, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 0));
+    assertEquals(1111, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
+    assertEquals(2222, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
+    assertEquals(0.11f,
+        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.0f),
0.001f);
+    assertEquals(0.22f,
+        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f),
0.001f);
+    assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT,
0.0f),
+        0.001f);
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB));
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS));
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES));
+    assertEquals("io", inputConf.get("io.shouldExist"));
+    assertEquals("file", inputConf.get("file.shouldExist"));
+    assertEquals("fs", inputConf.get("fs.shouldExist"));
+
+  }
+
+  @Test
+  public void testSetters() {
+    OrderedPartitionedKVEdgeConfiguration.Builder builder = OrderedPartitionedKVEdgeConfiguration
+        .newBuilder("KEY", "VALUE")
+        .setKeyComparatorClass("KEY_COMPARATOR")
+        .configureOutput("PARTITIONER", null).setSortBufferSize(1111).setSorterNumThreads(2).done()
+        .configureInput().setMaxSingleMemorySegmentFraction(0.11f).setMergeFraction(0.22f)
+        .setPostMergeBufferFraction(0.33f).setShuffleBufferFraction(0.44f).done()
+        .enableCompression("CustomCodec");
+
+    OrderedPartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    Configuration inputConf = rebuiltInput.conf;
+
+    assertEquals("KEY", outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
""));
+    assertEquals("VALUE",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, ""));
+    assertEquals("PARTITIONER", outputConf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
""));
+    assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 0));
+    assertEquals("CustomCodec",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+    assertEquals(true,
+        outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS,
+            false));
+    assertEquals("KEY_COMPARATOR",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
+
+
+    assertEquals("KEY", inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
""));
+    assertEquals("VALUE",
+        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, ""));
+    assertEquals("CustomCodec",
+        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
+    assertEquals(true,
+        inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED,
+            false));
+    assertEquals(0.11f,
+        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f),
0.001f);
+    assertEquals(0.22f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT,
0.0f),
+        0.001f);
+    assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT,
0.0f),
+        0.001f);
+    assertEquals(0.44f,
+        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.00f),
0.001f);
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB));
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC));
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS));
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfiguration.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfiguration.java
new file mode 100644
index 0000000..abb5199
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfiguration.java
@@ -0,0 +1,165 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.junit.Test;
+
+public class TestShuffledMergedInputConfiguration {
+
+  @Test
+  public void testNullParams() {
+    try {
+      ShuffledMergedInputConfiguration.newBuilder(null, "VALUE");
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      ShuffledMergedInputConfiguration.newBuilder("KEY", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+  }
+
+  @Test
+  public void testSetters() {
+    Configuration fromConf = new Configuration(false);
+    fromConf.set("test.conf.key.1", "confkey1");
+    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111);
+    fromConf.set("io.shouldExist", "io");
+    Map<String, String> additionalConf = new HashMap<String, String>();
+    additionalConf.put("test.key.2", "key2");
+    additionalConf.put(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, "3");
+    additionalConf.put("file.shouldExist", "file");
+    ShuffledMergedInputConfiguration.Builder builder =
+        ShuffledMergedInputConfiguration.newBuilder("KEY", "VALUE")
+            .setKeyComparatorClass("KEY_COMPARATOR")
+            .enableCompression("CustomCodec")
+            .setMaxSingleMemorySegmentFraction(0.11f)
+            .setMergeFraction(0.22f)
+            .setPostMergeBufferFraction(0.33f)
+            .setShuffleBufferFraction(0.44f)
+            .setAdditionalConfiguration("fs.shouldExist", "fs")
+            .setAdditionalConfiguration("test.key.1", "key1")
+            .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+                String.valueOf(false))
+            .setAdditionalConfiguration(additionalConf)
+            .setFromConfiguration(fromConf);
+
+    ShuffledMergedInputConfiguration configuration = builder.build();
+
+
+    byte[] confBytes = configuration.toByteArray();
+    ShuffledMergedInputConfiguration rebuilt = new ShuffledMergedInputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    // Verify programmatic API usage
+    assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, ""));
+    assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
""));
+    assertEquals("CustomCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
+    assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED,
+        false));
+    assertEquals(0.11f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
0.0f), 0.001f);
+    assertEquals(0.22f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f),
0.001f);
+    assertEquals(0.33f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, 0.0f),
0.001f);
+    assertEquals(0.44f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
0.00f), 0.001f);
+
+    // Verify additional configs
+    assertEquals(false, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals(1111, conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT));
+    assertEquals(3, conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, -1));
+    assertEquals("io", conf.get("io.shouldExist"));
+    assertEquals("file", conf.get("file.shouldExist"));
+    assertEquals("fs", conf.get("fs.shouldExist"));
+    assertNull(conf.get("test.conf.key.1"));
+    assertNull(conf.get("test.key.1"));
+    assertNull(conf.get("test.key.2"));
+  }
+
+  @Test
+  public void testDefaultConfigsUsed() {
+    ShuffledMergedInputConfiguration.Builder builder =
+        ShuffledMergedInputConfiguration.newBuilder("KEY", "VALUE");
+    ShuffledMergedInputConfiguration configuration = builder.build();
+
+    byte[] confBytes = configuration.toByteArray();
+    ShuffledMergedInputConfiguration rebuilt = new ShuffledMergedInputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+
+    // Default Output property present.
+    assertEquals("TestCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
+    // Input property should be absent
+    assertEquals("DEFAULT",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT"));
+
+    // Verify whatever was configured
+    assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, ""));
+    assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
""));
+  }
+
+  @Test
+  public void testCombinerConfigs() {
+    Configuration combinerConf = new Configuration(false);
+    combinerConf.set("combiner.test.key", "COMBINERKEY");
+    combinerConf
+        .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "InvalidKeyOverride");
+    ShuffledMergedInputConfiguration.Builder builder =
+        ShuffledMergedInputConfiguration.newBuilder("KEY", "VALUE")
+            .setCombiner("COMBINER", combinerConf);
+
+    ShuffledMergedInputConfiguration configuration = builder.build();
+
+    byte[] confBytes = configuration.toByteArray();
+    ShuffledMergedInputConfiguration rebuilt = new ShuffledMergedInputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    // Default Output property should not be overridden based on partitioner config
+    assertEquals("TestCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
+
+    assertEquals("COMBINERKEY", conf.get("combiner.test.key"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfiguration.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfiguration.java
new file mode 100644
index 0000000..d4de270
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfiguration.java
@@ -0,0 +1,137 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.junit.Test;
+
+public class TestShuffledUnorderedKVInputConfiguration {
+
+  @Test
+  public void testNullParams() {
+    try {
+      ShuffledUnorderedKVInputConfiguration.newBuilder(null, "VALUE");
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      ShuffledUnorderedKVInputConfiguration.newBuilder("KEY", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+  }
+
+  @Test
+  public void testSetters() {
+    Configuration fromConf = new Configuration(false);
+    fromConf.set("test.conf.key.1", "confkey1");
+    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111);
+    fromConf.set("io.shouldExist", "io");
+    Map<String, String> additionalConf = new HashMap<String, String>();
+    additionalConf.put("test.key.2", "key2");
+    additionalConf.put(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, "3");
+    additionalConf.put("file.shouldExist", "file");
+    ShuffledUnorderedKVInputConfiguration.Builder builder =
+        ShuffledUnorderedKVInputConfiguration.newBuilder("KEY", "VALUE")
+            .enableCompression("CustomCodec")
+            .setMaxSingleMemorySegmentFraction(0.11f)
+            .setMergeFraction(0.22f)
+            .setShuffleBufferFraction(0.33f)
+            .setAdditionalConfiguration("fs.shouldExist", "fs")
+            .setAdditionalConfiguration("test.key.1", "key1")
+            .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+                String.valueOf(false))
+            .setAdditionalConfiguration(additionalConf)
+            .setFromConfiguration(fromConf);
+
+    ShuffledUnorderedKVInputConfiguration configuration = builder.build();
+
+
+    byte[] confBytes = configuration.toByteArray();
+    ShuffledUnorderedKVInputConfiguration rebuilt = new ShuffledUnorderedKVInputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    // Verify programmatic API usage
+    assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, ""));
+    assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
""));
+    assertEquals("CustomCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
+    assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED,
+        false));
+    assertEquals(0.11f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
0.0f), 0.001f);
+    assertEquals(0.22f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f),
0.001f);
+    assertEquals(0.33f, conf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
0.00f), 0.001f);
+
+    // Verify additional configs
+    assertEquals(false, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals(1111, conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT));
+    assertEquals(3, conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, -1));
+    assertEquals("io", conf.get("io.shouldExist"));
+    assertEquals("file", conf.get("file.shouldExist"));
+    assertEquals("fs", conf.get("fs.shouldExist"));
+    assertNull(conf.get("test.conf.key.1"));
+    assertNull(conf.get("test.key.1"));
+    assertNull(conf.get("test.key.2"));
+  }
+
+  @Test
+  public void testDefaultConfigsUsed() {
+    ShuffledUnorderedKVInputConfiguration.Builder builder =
+        ShuffledUnorderedKVInputConfiguration.newBuilder("KEY", "VALUE");
+    ShuffledUnorderedKVInputConfiguration configuration = builder.build();
+
+    byte[] confBytes = configuration.toByteArray();
+    ShuffledUnorderedKVInputConfiguration rebuilt = new ShuffledUnorderedKVInputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+
+    // Default Output property present.
+    assertEquals("TestCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
+    // Input property should be absent
+    assertEquals("DEFAULT",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT"));
+
+    // Verify whatever was configured
+    assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, ""));
+    assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
""));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedEdgeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedEdgeConfiguration.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedEdgeConfiguration.java
new file mode 100644
index 0000000..23acf04
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedEdgeConfiguration.java
@@ -0,0 +1,205 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.junit.Test;
+
+public class TestUnorderedPartitionedEdgeConfiguration {
+  @Test
+  public void testIncompleteParameters() {
+    UnorderedPartitionedKVEdgeConfiguration.Builder builder =
+        UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE");
+    try {
+      builder.build();
+      fail("Should have failed since the partitioner has not been specified");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Output must be configured - partitioner"));
+    }
+  }
+
+  @Test
+  public void testNullParams() {
+    try {
+      UnorderedPartitionedKVEdgeConfiguration.newBuilder(null, "VALUE");
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE").configureOutput(null,
null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+  }
+
+  @Test
+  public void testDefaultConfigsUsed() {
+    UnorderedPartitionedKVEdgeConfiguration.Builder builder =
+        UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE")
+            .configureOutput("PARTITIONER", null).done();
+
+    UnorderedPartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileUnorderedPartitionedKVOutputConfiguration rebuiltOutput =
+        new OnFileUnorderedPartitionedKVOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledUnorderedKVInputConfiguration rebuiltInput =
+        new ShuffledUnorderedKVInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    assertEquals(true, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals("TestCodec",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+
+    Configuration inputConf = rebuiltInput.conf;
+    assertEquals(true, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals("TestCodec",
+        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
+  }
+
+  @Test
+  public void testSpecificIOConfs() {
+    // Ensures that Output and Input confs are not mixed.
+    UnorderedPartitionedKVEdgeConfiguration.Builder builder =
+        UnorderedPartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE")
+            .configureOutput("PARTITIONER", null).done();
+
+    UnorderedPartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileUnorderedPartitionedKVOutputConfiguration rebuiltOutput =
+        new OnFileUnorderedPartitionedKVOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledUnorderedKVInputConfiguration rebuiltInput =
+        new ShuffledUnorderedKVInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    assertEquals("DEFAULT",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "DEFAULT"));
+
+    Configuration inputConf = rebuiltInput.conf;
+    assertEquals("DEFAULT",
+        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT"));
+  }
+
+  @Test
+  public void tetCommonConf() {
+
+    Configuration fromConf = new Configuration(false);
+    fromConf.set("test.conf.key.1", "confkey1");
+    fromConf.setBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, false);
+    fromConf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.11f);
+    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 123);
+    fromConf.set("io.shouldExist", "io");
+    Map<String, String> additionalConfs = new HashMap<String, String>();
+    additionalConfs.put("test.key.2", "key2");
+    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
+    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
+    additionalConfs
+        .put(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, "2222");
+    additionalConfs.put("file.shouldExist", "file");
+
+    UnorderedPartitionedKVEdgeConfiguration.Builder builder = UnorderedPartitionedKVEdgeConfiguration
+        .newBuilder("KEY",
+            "VALUE")
+        .configureOutput("PARTITIONER", null).done()
+        .setAdditionalConfiguration("fs.shouldExist", "fs")
+        .setAdditionalConfiguration("test.key.1", "key1")
+        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, "3333")
+        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, "0.33f")
+        .setAdditionalConfiguration(additionalConfs)
+        .setFromConfiguration(fromConf);
+
+    UnorderedPartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileUnorderedPartitionedKVOutputConfiguration rebuiltOutput =
+        new OnFileUnorderedPartitionedKVOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledUnorderedKVInputConfiguration rebuiltInput =
+        new ShuffledUnorderedKVInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    Configuration inputConf = rebuiltInput.conf;
+
+    assertEquals(false, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, true));
+    assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
0));
+    assertEquals(3333, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT));
+    assertEquals(123,
+        outputConf.getInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 0));
+    assertEquals(2222,
+        outputConf.getInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES,
0));
+    assertEquals("io", outputConf.get("io.shouldExist"));
+    assertEquals("file", outputConf.get("file.shouldExist"));
+    assertEquals("fs", outputConf.get("fs.shouldExist"));
+
+
+    assertEquals(false, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, true));
+    assertEquals(1111, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
+    assertEquals(3333, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
+    assertEquals(0.11f,
+        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.0f),
0.001f);
+    assertEquals(0.22f,
+        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f),
0.001f);
+    assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT,
0.0f),
+        0.001f);
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB));
+    assertNull(inputConf.get(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES));
+    assertEquals("io", inputConf.get("io.shouldExist"));
+    assertEquals("file", inputConf.get("file.shouldExist"));
+    assertEquals("fs", inputConf.get("fs.shouldExist"));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedEdgeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedEdgeConfiguration.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedEdgeConfiguration.java
new file mode 100644
index 0000000..1fbe653
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedEdgeConfiguration.java
@@ -0,0 +1,175 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.junit.Test;
+
+public class TestUnorderedUnpartitionedEdgeConfiguration {
+
+  @Test
+  public void testNullParams() {
+    try {
+      UnorderedUnpartitionedKVEdgeConfiguration.newBuilder(null, "VALUE");
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      UnorderedUnpartitionedKVEdgeConfiguration.newBuilder("KEY", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+  }
+
+  @Test
+  public void testDefaultConfigsUsed() {
+    UnorderedUnpartitionedKVEdgeConfiguration.Builder builder =
+        UnorderedUnpartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE");
+
+    UnorderedUnpartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileUnorderedKVOutputConfiguration rebuiltOutput =
+        new OnFileUnorderedKVOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledUnorderedKVInputConfiguration rebuiltInput =
+        new ShuffledUnorderedKVInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    assertEquals(true, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals("TestCodec",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+
+    Configuration inputConf = rebuiltInput.conf;
+    assertEquals(true, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals("TestCodec",
+        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, ""));
+  }
+
+  @Test
+  public void testSpecificIOConfs() {
+    // Ensures that Output and Input confs are not mixed.
+    UnorderedUnpartitionedKVEdgeConfiguration.Builder builder =
+        UnorderedUnpartitionedKVEdgeConfiguration.newBuilder("KEY", "VALUE");
+
+    UnorderedUnpartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileUnorderedKVOutputConfiguration rebuiltOutput =
+        new OnFileUnorderedKVOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledUnorderedKVInputConfiguration rebuiltInput =
+        new ShuffledUnorderedKVInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    assertEquals("DEFAULT",
+        outputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "DEFAULT"));
+
+    Configuration inputConf = rebuiltInput.conf;
+    assertEquals("DEFAULT",
+        inputConf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "DEFAULT"));
+  }
+
+  @Test
+  public void tetCommonConf() {
+
+    Configuration fromConf = new Configuration(false);
+    fromConf.set("test.conf.key.1", "confkey1");
+    fromConf.setBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, false);
+    fromConf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.11f);
+    fromConf.set("io.shouldExist", "io");
+    Map<String, String> additionalConfs = new HashMap<String, String>();
+    additionalConfs.put("test.key.2", "key2");
+    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
+    additionalConfs.put(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
+    additionalConfs.put("file.shouldExist", "file");
+
+    UnorderedUnpartitionedKVEdgeConfiguration.Builder builder = UnorderedUnpartitionedKVEdgeConfiguration
+        .newBuilder("KEY",
+            "VALUE")
+        .setAdditionalConfiguration("fs.shouldExist", "fs")
+        .setAdditionalConfiguration("test.key.1", "key1")
+        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, "3333")
+        .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, "0.33f")
+        .setAdditionalConfiguration(additionalConfs)
+        .setFromConfiguration(fromConf);
+
+    UnorderedUnpartitionedKVEdgeConfiguration configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileUnorderedKVOutputConfiguration rebuiltOutput =
+        new OnFileUnorderedKVOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledUnorderedKVInputConfiguration rebuiltInput =
+        new ShuffledUnorderedKVInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    Configuration inputConf = rebuiltInput.conf;
+
+    assertEquals(false, outputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, true));
+    assertEquals(1111, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
0));
+    assertEquals(3333, outputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT));
+    assertNull(outputConf.get(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT));
+    assertEquals("io", outputConf.get("io.shouldExist"));
+    assertEquals("file", outputConf.get("file.shouldExist"));
+    assertEquals("fs", outputConf.get("fs.shouldExist"));
+
+
+    assertEquals(false, inputConf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD, true));
+    assertEquals(1111, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
+    assertEquals(3333, inputConf.getInt(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
+    assertEquals(0.11f,
+        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.0f),
0.001f);
+    assertEquals(0.22f,
+        inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f),
0.001f);
+    assertEquals(0.33f, inputConf.getFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT,
0.0f),
+        0.001f);
+    assertEquals("io", inputConf.get("io.shouldExist"));
+    assertEquals("file", inputConf.get("file.shouldExist"));
+    assertEquals("fs", inputConf.get("fs.shouldExist"));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/resources/tez-site.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/resources/tez-site.xml b/tez-runtime-library/src/test/resources/tez-site.xml
new file mode 100644
index 0000000..399bfab
--- /dev/null
+++ b/tez-runtime-library/src/test/resources/tez-site.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<configuration>
+  <property>
+    <name>tez.runtime.ifile.readahead</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>tez.runtime.intermediate-output.compress.codec</name>
+    <value>TestCodec</value>
+  </property>
+
+  <property>
+    <name>tez.runtime.intermediate-input.compress.codec</name>
+    <value>TestCodec</value>
+  </property>
+</configuration>


Mime
View raw message