tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/6] TEZ-1417. Rename *Configurer to ConfigBuilder/Config. (sseth)
Date Tue, 19 Aug 2014 00:02:07 GMT
Repository: tez
Updated Branches:
  refs/heads/master 37ac50aed -> b526ed5c1


http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java
new file mode 100644
index 0000000..772c2e9
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java
@@ -0,0 +1,188 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.junit.Test;
+
+public class TestUnorderedPartitionedKVEdgeConfig {
+
+  @Test
+  public void testNullParams() {
+    try {
+      UnorderedPartitionedKVEdgeConfig.newBuilder(null, "VALUE", "PARTITIONER");
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      UnorderedPartitionedKVEdgeConfig.newBuilder("KEY", null, "PARTITIONER");
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      UnorderedPartitionedKVEdgeConfig.newBuilder("KEY", "VALUE", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+  }
+
+  @Test
+  public void testDefaultConfigsUsed() {
+    UnorderedPartitionedKVEdgeConfig.Builder builder =
+        UnorderedPartitionedKVEdgeConfig.newBuilder("KEY", "VALUE", "PARTITIONER");
+    builder.setKeySerializationClass("SerClass1", null);
+    builder.setValueSerializationClass("SerClass2", null);
+
+    UnorderedPartitionedKVEdgeConfig configuration = builder.build();
+
+    UnorderedPartitionedKVOutputConfig rebuiltOutput =
+        new UnorderedPartitionedKVOutputConfig();
+    rebuiltOutput.fromUserPayload(configuration.getOutputPayload());
+    UnorderedKVInputConfig rebuiltInput =
+        new UnorderedKVInputConfig();
+    rebuiltInput.fromUserPayload(configuration.getInputPayload());
+
+    Configuration outputConf = rebuiltOutput.conf;
+    assertEquals(true, outputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals("TestCodec",
+        outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
+    assertTrue(outputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith
+        ("SerClass2,SerClass1"));
+
+    Configuration inputConf = rebuiltInput.conf;
+    assertEquals(true, inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals("TestCodec",
+        inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
+    assertTrue(inputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith
+        ("SerClass2,SerClass1"));
+  }
+
+  @Test
+  public void testSpecificIOConfs() {
+    // Ensures that Output and Input confs are not mixed.
+    UnorderedPartitionedKVEdgeConfig.Builder builder =
+        UnorderedPartitionedKVEdgeConfig.newBuilder("KEY", "VALUE", "PARTITIONER");
+
+    UnorderedPartitionedKVEdgeConfig configuration = builder.build();
+
+    UnorderedPartitionedKVOutputConfig rebuiltOutput =
+        new UnorderedPartitionedKVOutputConfig();
+    rebuiltOutput.fromUserPayload(configuration.getOutputPayload());
+    UnorderedKVInputConfig rebuiltInput =
+        new UnorderedKVInputConfig();
+    rebuiltInput.fromUserPayload(configuration.getInputPayload());
+
+    Configuration outputConf = rebuiltOutput.conf;
+    assertEquals("TestCodec",
+        outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "DEFAULT"));
+
+    Configuration inputConf = rebuiltInput.conf;
+    assertEquals("TestCodec",
+        inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "DEFAULT"));
+  }
+
+  @Test
+  public void tetCommonConf() {
+
+    Configuration fromConf = new Configuration(false);
+    fromConf.set("test.conf.key.1", "confkey1");
+    fromConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, false);
+    fromConf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.11f);
+    fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 123);
+    fromConf.set("io.shouldExist", "io");
+    Map<String, String> additionalConfs = new HashMap<String, String>();
+    additionalConfs.put("test.key.2", "key2");
+    additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
+    additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
+    additionalConfs
+        .put(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, "2222");
+    additionalConfs.put("file.shouldExist", "file");
+
+    UnorderedPartitionedKVEdgeConfig.Builder builder = UnorderedPartitionedKVEdgeConfig
+        .newBuilder("KEY", "VALUE", "PARTITIONER")
+        .setAdditionalConfiguration("fs.shouldExist", "fs")
+        .setAdditionalConfiguration("test.key.1", "key1")
+        .setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, "3333")
+        .setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, "0.33f")
+        .setAdditionalConfiguration(additionalConfs)
+        .setFromConfiguration(fromConf);
+
+    UnorderedPartitionedKVEdgeConfig configuration = builder.build();
+
+    UnorderedPartitionedKVOutputConfig rebuiltOutput =
+        new UnorderedPartitionedKVOutputConfig();
+    rebuiltOutput.fromUserPayload(configuration.getOutputPayload());
+    UnorderedKVInputConfig rebuiltInput =
+        new UnorderedKVInputConfig();
+    rebuiltInput.fromUserPayload(configuration.getInputPayload());
+
+    Configuration outputConf = rebuiltOutput.conf;
+    Configuration inputConf = rebuiltInput.conf;
+
+    assertEquals(false, outputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, true));
+    assertEquals(1111, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
+    assertEquals(3333, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
+    assertNull(outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
+    assertNull(outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT));
+    assertNull(outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT));
+    assertEquals(123,
+        outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 0));
+    assertEquals(2222,
+        outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, 0));
+    assertEquals("io", outputConf.get("io.shouldExist"));
+    assertEquals("file", outputConf.get("file.shouldExist"));
+    assertEquals("fs", outputConf.get("fs.shouldExist"));
+
+
+    assertEquals(false, inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, true));
+    assertEquals(1111, inputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
+    assertEquals(3333, inputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
+    assertEquals(0.11f,
+        inputConf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.0f), 0.001f);
+    assertEquals(0.22f,
+        inputConf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f);
+    assertEquals(0.33f, inputConf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f),
+        0.001f);
+    assertNull(inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB));
+    assertNull(inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES));
+    assertEquals("io", inputConf.get("io.shouldExist"));
+    assertEquals("file", inputConf.get("file.shouldExist"));
+    assertEquals("fs", inputConf.get("fs.shouldExist"));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
deleted file mode 100644
index 796a5b0..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.junit.Test;
-
-public class TestUnorderedPartitionedKVEdgeConfigurer {
-
-  @Test
-  public void testNullParams() {
-    try {
-      UnorderedPartitionedKVEdgeConfigurer.newBuilder(null, "VALUE", "PARTITIONER");
-      fail("Expecting a null parameter list to fail");
-    } catch (NullPointerException npe) {
-      assertTrue(npe.getMessage().contains("cannot be null"));
-    }
-
-    try {
-      UnorderedPartitionedKVEdgeConfigurer.newBuilder("KEY", null, "PARTITIONER");
-      fail("Expecting a null parameter list to fail");
-    } catch (NullPointerException npe) {
-      assertTrue(npe.getMessage().contains("cannot be null"));
-    }
-
-    try {
-      UnorderedPartitionedKVEdgeConfigurer.newBuilder("KEY", "VALUE", null);
-      fail("Expecting a null parameter list to fail");
-    } catch (NullPointerException npe) {
-      assertTrue(npe.getMessage().contains("cannot be null"));
-    }
-  }
-
-  @Test
-  public void testDefaultConfigsUsed() {
-    UnorderedPartitionedKVEdgeConfigurer.Builder builder =
-        UnorderedPartitionedKVEdgeConfigurer.newBuilder("KEY", "VALUE", "PARTITIONER");
-    builder.setKeySerializationClass("SerClass1", null);
-    builder.setValueSerializationClass("SerClass2", null);
-
-    UnorderedPartitionedKVEdgeConfigurer configuration = builder.build();
-
-    UnorderedPartitionedKVOutputConfigurer rebuiltOutput =
-        new UnorderedPartitionedKVOutputConfigurer();
-    rebuiltOutput.fromUserPayload(configuration.getOutputPayload());
-    UnorderedKVInputConfigurer rebuiltInput =
-        new UnorderedKVInputConfigurer();
-    rebuiltInput.fromUserPayload(configuration.getInputPayload());
-
-    Configuration outputConf = rebuiltOutput.conf;
-    assertEquals(true, outputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
-        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
-    assertEquals("TestCodec",
-        outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
-    assertTrue(outputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith
-        ("SerClass2,SerClass1"));
-
-    Configuration inputConf = rebuiltInput.conf;
-    assertEquals(true, inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
-        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
-    assertEquals("TestCodec",
-        inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
-    assertTrue(inputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith
-        ("SerClass2,SerClass1"));
-  }
-
-  @Test
-  public void testSpecificIOConfs() {
-    // Ensures that Output and Input confs are not mixed.
-    UnorderedPartitionedKVEdgeConfigurer.Builder builder =
-        UnorderedPartitionedKVEdgeConfigurer.newBuilder("KEY", "VALUE", "PARTITIONER");
-
-    UnorderedPartitionedKVEdgeConfigurer configuration = builder.build();
-
-    UnorderedPartitionedKVOutputConfigurer rebuiltOutput =
-        new UnorderedPartitionedKVOutputConfigurer();
-    rebuiltOutput.fromUserPayload(configuration.getOutputPayload());
-    UnorderedKVInputConfigurer rebuiltInput =
-        new UnorderedKVInputConfigurer();
-    rebuiltInput.fromUserPayload(configuration.getInputPayload());
-
-    Configuration outputConf = rebuiltOutput.conf;
-    assertEquals("TestCodec",
-        outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "DEFAULT"));
-
-    Configuration inputConf = rebuiltInput.conf;
-    assertEquals("TestCodec",
-        inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "DEFAULT"));
-  }
-
-  @Test
-  public void tetCommonConf() {
-
-    Configuration fromConf = new Configuration(false);
-    fromConf.set("test.conf.key.1", "confkey1");
-    fromConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, false);
-    fromConf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.11f);
-    fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 123);
-    fromConf.set("io.shouldExist", "io");
-    Map<String, String> additionalConfs = new HashMap<String, String>();
-    additionalConfs.put("test.key.2", "key2");
-    additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
-    additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
-    additionalConfs
-        .put(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, "2222");
-    additionalConfs.put("file.shouldExist", "file");
-
-    UnorderedPartitionedKVEdgeConfigurer.Builder builder = UnorderedPartitionedKVEdgeConfigurer
-        .newBuilder("KEY", "VALUE", "PARTITIONER")
-        .setAdditionalConfiguration("fs.shouldExist", "fs")
-        .setAdditionalConfiguration("test.key.1", "key1")
-        .setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, "3333")
-        .setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, "0.33f")
-        .setAdditionalConfiguration(additionalConfs)
-        .setFromConfiguration(fromConf);
-
-    UnorderedPartitionedKVEdgeConfigurer configuration = builder.build();
-
-    UnorderedPartitionedKVOutputConfigurer rebuiltOutput =
-        new UnorderedPartitionedKVOutputConfigurer();
-    rebuiltOutput.fromUserPayload(configuration.getOutputPayload());
-    UnorderedKVInputConfigurer rebuiltInput =
-        new UnorderedKVInputConfigurer();
-    rebuiltInput.fromUserPayload(configuration.getInputPayload());
-
-    Configuration outputConf = rebuiltOutput.conf;
-    Configuration inputConf = rebuiltInput.conf;
-
-    assertEquals(false, outputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, true));
-    assertEquals(1111, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
-    assertEquals(3333, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
-    assertNull(outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
-    assertNull(outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT));
-    assertNull(outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT));
-    assertEquals(123,
-        outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 0));
-    assertEquals(2222,
-        outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, 0));
-    assertEquals("io", outputConf.get("io.shouldExist"));
-    assertEquals("file", outputConf.get("file.shouldExist"));
-    assertEquals("fs", outputConf.get("fs.shouldExist"));
-
-
-    assertEquals(false, inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, true));
-    assertEquals(1111, inputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
-    assertEquals(3333, inputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
-    assertEquals(0.11f,
-        inputConf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.0f), 0.001f);
-    assertEquals(0.22f,
-        inputConf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f);
-    assertEquals(0.33f, inputConf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f),
-        0.001f);
-    assertNull(inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB));
-    assertNull(inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES));
-    assertEquals("io", inputConf.get("io.shouldExist"));
-    assertEquals("file", inputConf.get("file.shouldExist"));
-    assertEquals("fs", inputConf.get("fs.shouldExist"));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
new file mode 100644
index 0000000..1b7c65f
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
@@ -0,0 +1,177 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.junit.Test;
+
+public class TestUnorderedPartitionedKVOutputConfig {
+
+  @Test
+  public void testNullParams() {
+    try {
+      UnorderedPartitionedKVOutputConfig.newBuilder(
+          null, "VALUE", "PARTITIONER", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      UnorderedPartitionedKVOutputConfig.newBuilder(
+          "KEY", null, "PARTITIONER", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      UnorderedPartitionedKVOutputConfig.newBuilder(
+          "KEY", "VALUE", null, null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+  }
+
+  @Test
+  public void testSetters() {
+    Configuration fromConf = new Configuration(false);
+    fromConf.set("test.conf.key.1", "confkey1");
+    fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111);
+    fromConf.set("io.shouldExist", "io");
+    Map<String, String> additionalConf = new HashMap<String, String>();
+    additionalConf.put("test.key.2", "key2");
+    additionalConf.put(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, "2222");
+    additionalConf.put("file.shouldExist", "file");
+    UnorderedPartitionedKVOutputConfig.Builder builder =
+        UnorderedPartitionedKVOutputConfig.newBuilder("KEY", "VALUE", "PARTITIONER",
+            null)
+            .setCompression(true, "CustomCodec", null)
+            .setAvailableBufferSize(1111)
+            .setAdditionalConfiguration("fs.shouldExist", "fs")
+            .setAdditionalConfiguration("test.key.1", "key1")
+            .setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
+                String.valueOf(false))
+            .setAdditionalConfiguration(additionalConf)
+            .setFromConfiguration(fromConf);
+
+    UnorderedPartitionedKVOutputConfig configuration = builder.build();
+
+    UnorderedPartitionedKVOutputConfig rebuilt =
+        new UnorderedPartitionedKVOutputConfig();
+    rebuilt.fromUserPayload(configuration.toUserPayload());
+
+    Configuration conf = rebuilt.conf;
+
+    // Verify programmatic API usage
+    assertEquals(1111, conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 0));
+    assertEquals("KEY", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, ""));
+    assertEquals("VALUE", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, ""));
+    assertEquals("PARTITIONER", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, ""));
+    assertEquals("CustomCodec",
+        conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
+    assertEquals(true, conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS,
+        false));
+
+    // Verify additional configs
+    assertEquals(false, conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals(1111, conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT));
+    assertEquals(2222,
+        conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, 0));
+    assertEquals("io", conf.get("io.shouldExist"));
+    assertEquals("file", conf.get("file.shouldExist"));
+    assertEquals("fs", conf.get("fs.shouldExist"));
+    assertNull(conf.get("test.conf.key.1"));
+    assertNull(conf.get("test.key.1"));
+    assertNull(conf.get("test.key.2"));
+  }
+
+  @Test
+  public void testDefaultConfigsUsed() {
+    UnorderedPartitionedKVOutputConfig.Builder builder =
+        UnorderedPartitionedKVOutputConfig
+            .newBuilder("KEY", "VALUE", "PARTITIONER", null)
+            .setKeySerializationClass("SerClass1", null)
+            .setValueSerializationClass("SerClass2", null);
+    UnorderedPartitionedKVOutputConfig configuration = builder.build();
+
+    UnorderedPartitionedKVOutputConfig rebuilt =
+        new UnorderedPartitionedKVOutputConfig();
+    rebuilt.fromUserPayload(configuration.toUserPayload());
+
+    Configuration conf = rebuilt.conf;
+
+    assertEquals(true, conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+
+    // Default property present.
+    assertEquals("TestCodec",
+        conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
+
+    // Verify whatever was configured
+    assertEquals("KEY", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, ""));
+    assertEquals("VALUE", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, ""));
+    assertEquals("PARTITIONER", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, ""));
+    assertTrue(conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith("SerClass2," +
+        "SerClass1"));
+    //for unordered paritioned kv output, comparator is not populated
+    assertNull(conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS));
+  }
+
+  @Test
+  public void testPartitionerConfigs() {
+    Map<String, String> partitionerConf = Maps.newHashMap();
+    partitionerConf.put("partitioner.test.key", "PARTITIONERKEY");
+    partitionerConf
+        .put(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "InvalidKeyOverride");
+    UnorderedPartitionedKVOutputConfig.Builder builder =
+        UnorderedPartitionedKVOutputConfig
+            .newBuilder("KEY", "VALUE", "PARTITIONER", partitionerConf);
+
+    UnorderedPartitionedKVOutputConfig configuration = builder.build();
+
+    UnorderedPartitionedKVOutputConfig rebuilt =
+        new UnorderedPartitionedKVOutputConfig();
+    rebuilt.fromUserPayload(configuration.toUserPayload());
+
+    Configuration conf = rebuilt.conf;
+
+    // Default Output property should not be overridden based on partitioner config
+    assertEquals("TestCodec",
+        conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
+
+    assertEquals("PARTITIONERKEY", conf.get("partitioner.test.key"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
deleted file mode 100644
index 2215557..0000000
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.junit.Test;
-
-public class TestUnorderedUnpartitionedKVEdgeConfigurer {
-
-  @Test
-  public void testNullParams() {
-    try {
-      UnorderedKVEdgeConfigurer.newBuilder(null, "VALUE");
-      fail("Expecting a null parameter list to fail");
-    } catch (NullPointerException npe) {
-      assertTrue(npe.getMessage().contains("cannot be null"));
-    }
-
-    try {
-      UnorderedKVEdgeConfigurer.newBuilder("KEY", null);
-      fail("Expecting a null parameter list to fail");
-    } catch (NullPointerException npe) {
-      assertTrue(npe.getMessage().contains("cannot be null"));
-    }
-  }
-
-  @Test
-  public void testDefaultConfigsUsed() {
-    UnorderedKVEdgeConfigurer.Builder builder =
-        UnorderedKVEdgeConfigurer.newBuilder("KEY", "VALUE");
-    builder.setKeySerializationClass("SerClass1", null).setValueSerializationClass("SerClass2", null);
-
-    UnorderedKVEdgeConfigurer configuration = builder.build();
-
-
-    UnorderedKVOutputConfigurer rebuiltOutput =
-        new UnorderedKVOutputConfigurer();
-    rebuiltOutput.fromUserPayload(configuration.getOutputPayload());
-    UnorderedKVInputConfigurer rebuiltInput =
-        new UnorderedKVInputConfigurer();
-    rebuiltInput.fromUserPayload(configuration.getInputPayload());
-
-    Configuration outputConf = rebuiltOutput.conf;
-    assertEquals(true, outputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
-        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
-    assertEquals("TestCodec",
-        outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
-    assertTrue(outputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith
-        ("SerClass2,SerClass1"));
-
-    Configuration inputConf = rebuiltInput.conf;
-    assertEquals(true, inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
-        TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
-    assertEquals("TestCodec",
-        inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
-    assertTrue(inputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith
-        ("SerClass2,SerClass1"));
-  }
-
-  @Test
-  public void testSpecificIOConfs() {
-    // Ensures that Output and Input confs are not mixed.
-    UnorderedKVEdgeConfigurer.Builder builder =
-        UnorderedKVEdgeConfigurer.newBuilder("KEY", "VALUE");
-
-    UnorderedKVEdgeConfigurer configuration = builder.build();
-
-    UnorderedKVOutputConfigurer rebuiltOutput =
-        new UnorderedKVOutputConfigurer();
-    rebuiltOutput.fromUserPayload(configuration.getOutputPayload());
-    UnorderedKVInputConfigurer rebuiltInput =
-        new UnorderedKVInputConfigurer();
-    rebuiltInput.fromUserPayload(configuration.getInputPayload());
-
-    Configuration outputConf = rebuiltOutput.conf;
-    assertEquals("TestCodec",
-        outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "DEFAULT"));
-
-    Configuration inputConf = rebuiltInput.conf;
-    assertEquals("TestCodec",
-        inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "DEFAULT"));
-  }
-
-  @Test
-  public void tetCommonConf() {
-
-    Configuration fromConf = new Configuration(false);
-    fromConf.set("test.conf.key.1", "confkey1");
-    fromConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, false);
-    fromConf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.11f);
-    fromConf.set("io.shouldExist", "io");
-    Map<String, String> additionalConfs = new HashMap<String, String>();
-    additionalConfs.put("test.key.2", "key2");
-    additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111");
-    additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f");
-    additionalConfs.put("file.shouldExist", "file");
-
-    UnorderedKVEdgeConfigurer.Builder builder = UnorderedKVEdgeConfigurer
-        .newBuilder("KEY",
-            "VALUE")
-        .setAdditionalConfiguration("fs.shouldExist", "fs")
-        .setAdditionalConfiguration("test.key.1", "key1")
-        .setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, "3333")
-        .setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, "0.33f")
-        .setAdditionalConfiguration(additionalConfs)
-        .setFromConfiguration(fromConf);
-
-    UnorderedKVEdgeConfigurer configuration = builder.build();
-
-    UnorderedKVOutputConfigurer rebuiltOutput =
-        new UnorderedKVOutputConfigurer();
-    rebuiltOutput.fromUserPayload(configuration.getOutputPayload());
-    UnorderedKVInputConfigurer rebuiltInput =
-        new UnorderedKVInputConfigurer();
-    rebuiltInput.fromUserPayload(configuration.getInputPayload());
-
-    Configuration outputConf = rebuiltOutput.conf;
-    Configuration inputConf = rebuiltInput.conf;
-
-    assertEquals(false, outputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, true));
-    assertEquals(1111, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
-    assertEquals(3333, outputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
-    assertNull(outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
-    assertNull(outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT));
-    assertNull(outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT));
-    assertEquals("io", outputConf.get("io.shouldExist"));
-    assertEquals("file", outputConf.get("file.shouldExist"));
-    assertEquals("fs", outputConf.get("fs.shouldExist"));
-
-
-    assertEquals(false, inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, true));
-    assertEquals(1111, inputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 0));
-    assertEquals(3333, inputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, 0));
-    assertEquals(0.11f,
-        inputConf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 0.0f), 0.001f);
-    assertEquals(0.22f,
-        inputConf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f), 0.001f);
-    assertEquals(0.33f, inputConf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f),
-        0.001f);
-    assertEquals("io", inputConf.get("io.shouldExist"));
-    assertEquals("file", inputConf.get("file.shouldExist"));
-    assertEquals("fs", inputConf.get("fs.shouldExist"));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index ec72205..af82e6c 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -50,7 +50,7 @@ import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
 import org.apache.tez.runtime.library.output.UnorderedKVOutput;
 import org.apache.tez.runtime.library.processor.SimpleProcessor;
 
@@ -158,7 +158,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
     oneToOneVertex.setVertexManagerPlugin(
             new VertexManagerPluginDescriptor(InputReadyVertexManager.class.getName()));
 
-    UnorderedKVEdgeConfigurer edgeConf = UnorderedKVEdgeConfigurer
+    UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig
         .newBuilder(Text.class.getName(), IntWritable.class.getName()).build();
 
     DAG dag = new DAG("BroadcastAndOneToOneExample");

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index b9651b1..ac4cfb1 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -75,7 +75,7 @@ import org.apache.tez.mapreduce.examples.processor.FilterByWordOutputProcessor;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
 
 import com.google.common.collect.Sets;
 
@@ -180,8 +180,8 @@ public class FilterLinesByWord extends Configured implements Tool {
       stage1Conf.setBoolean("mapred.mapper.new-api", false);
       dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(stage1Conf, stagingDir, true);
     } else {
-      dsd = MRInputLegacy.createConfigurer(stage1Conf, TextInputFormat.class, inputPath)
-          .groupSplits(false).create();
+      dsd = MRInputLegacy.createConfigBuilder(stage1Conf, TextInputFormat.class, inputPath)
+          .groupSplits(false).build();
     }
     stage1Vertex.addDataSource("MRInput", dsd);
 
@@ -197,7 +197,7 @@ public class FilterLinesByWord extends Configured implements Tool {
     OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
     stage2Vertex.addDataSink("MROutput", new DataSinkDescriptor(od, ocd, null));
 
-    UnorderedKVEdgeConfigurer edgeConf = UnorderedKVEdgeConfigurer
+    UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig
         .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();
 
     DAG dag = new DAG("FilterLinesByWord");

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index fe3a1fd..a52cb5a 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -68,7 +68,7 @@ import org.apache.tez.mapreduce.examples.processor.FilterByWordOutputProcessor;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
 
 public class FilterLinesByWordOneToOne extends Configured implements Tool {
 
@@ -171,8 +171,8 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
       stage1Conf.setBoolean("mapred.mapper.new-api", false);
       dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(stage1Conf, stagingDir, true);
     } else {
-      dsd = MRInputLegacy.createConfigurer(stage1Conf, TextInputFormat.class, inputPath)
-          .groupSplits(false).create();
+      dsd = MRInputLegacy.createConfigBuilder(stage1Conf, TextInputFormat.class, inputPath)
+          .groupSplits(false).build();
     }
     stage1Vertex.addDataSource("MRInput", dsd);
 
@@ -189,7 +189,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
             .setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf)),
             new OutputCommitterDescriptor(MROutputCommitter.class.getName()), null));
 
-    UnorderedKVEdgeConfigurer edgeConf = UnorderedKVEdgeConfigurer
+    UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig
         .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();
 
     DAG dag = new DAG("FilterLinesByWord");

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index b22a4a8..daad5ba 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -82,7 +82,7 @@ import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -490,8 +490,8 @@ public class MRRSleepJob extends Configured implements Tool {
       dataSource = MRInputHelpers
           .configureMRInputWithLegacySplitGeneration(mapStageConf, remoteStagingDir, true);
     } else {
-      dataSource = MRInputLegacy.createConfigurer(mapStageConf, SleepInputFormat.class)
-          .generateSplitsInAM(generateSplitsInAM).create();
+      dataSource = MRInputLegacy.createConfigBuilder(mapStageConf, SleepInputFormat.class)
+          .generateSplitsInAM(generateSplitsInAM).build();
     }
 
     DAG dag = new DAG("MRRSleepJob");
@@ -550,19 +550,19 @@ public class MRRSleepJob extends Configured implements Tool {
       finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
           ReduceProcessor.class.getName()).setUserPayload(reducePayload), numReducer);
       finalReduceVertex.setTaskLocalFiles(commonLocalResources);
-      finalReduceVertex.addDataSink("MROutput", MROutputLegacy.createConfigurer(finalReduceConf,
-          NullOutputFormat.class).create());
+      finalReduceVertex.addDataSink("MROutput", MROutputLegacy.createConfigBuilder(finalReduceConf,
+          NullOutputFormat.class).build());
       vertices.add(finalReduceVertex);
     } else {
       // Map only job
       mapVertex.addDataSink("MROutput",
-          MROutputLegacy.createConfigurer(mapStageConf, NullOutputFormat.class).create());
+          MROutputLegacy.createConfigBuilder(mapStageConf, NullOutputFormat.class).build());
     }
 
 
     Map<String, String> partitionerConf = Maps.newHashMap();
     partitionerConf.put(MRJobConfig.PARTITIONER_CLASS_ATTR, MRRSleepJobPartitioner.class.getName());
-    OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
+    OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig
         .newBuilder(IntWritable.class.getName(), IntWritable.class.getName(),
             HashPartitioner.class.getName(), partitionerConf).configureInput().useLegacyInput()
         .done().build();

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 502b4c4..edfd04a 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -75,7 +75,7 @@ import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -193,7 +193,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
       mapStageConf.setBoolean("mapred.mapper.new-api", true);
       dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(mapStageConf, stagingDir, true);
     } else {
-      dsd = MRInputLegacy.createConfigurer(mapStageConf, TextInputFormat.class, inputPath).create();
+      dsd = MRInputLegacy.createConfigBuilder(mapStageConf, TextInputFormat.class, inputPath).build();
     }
 
     Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
@@ -224,8 +224,8 @@ public class TestOrderedWordCount extends Configured implements Tool {
                 .setHistoryText(finalReduceStageHistoryText), 1);
     finalReduceVertex.setTaskLocalFiles(commonLocalResources);
     finalReduceVertex.addDataSink("MROutput",
-        MROutputLegacy.createConfigurer(finalReduceConf, TextOutputFormat.class, outputPath)
-            .create());
+        MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath)
+            .build());
     vertices.add(finalReduceVertex);
 
     DAG dag = new DAG("OrderedWordCount" + dagIndex);
@@ -233,7 +233,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
       dag.addVertex(vertices.get(i));
     }
 
-    OrderedPartitionedKVEdgeConfigurer edgeConf1 = OrderedPartitionedKVEdgeConfigurer
+    OrderedPartitionedKVEdgeConfig edgeConf1 = OrderedPartitionedKVEdgeConfig
         .newBuilder(Text.class.getName(), IntWritable.class.getName(),
             HashPartitioner.class.getName()).setFromConfiguration(conf)
 	    .configureInput().useLegacyInput().done().build();
@@ -241,7 +241,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
         new Edge(dag.getVertex("initialmap"), dag.getVertex("intermediate_reducer"),
             edgeConf1.createDefaultEdgeProperty()));
 
-    OrderedPartitionedKVEdgeConfigurer edgeConf2 = OrderedPartitionedKVEdgeConfigurer
+    OrderedPartitionedKVEdgeConfig edgeConf2 = OrderedPartitionedKVEdgeConfig
         .newBuilder(IntWritable.class.getName(), Text.class.getName(),
             HashPartitioner.class.getName()).setFromConfiguration(conf)
             .configureInput().useLegacyInput().done().build();

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index a9b23e0..e136558 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -50,7 +50,6 @@ import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.mapreduce.input.MRInput;
-import org.apache.tez.mapreduce.input.MRInput.MRInputConfigurer;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -59,7 +58,7 @@ import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 
@@ -169,9 +168,10 @@ public class UnionExample {
     
     int numMaps = -1;
     Configuration inputConf = new Configuration(tezConf);
-    MRInputConfigurer configurer = MRInput.createConfigurer(inputConf, TextInputFormat.class,
+    MRInput.MRInputConfigBuilder configurer = MRInput.createConfigBuilder(inputConf,
+        TextInputFormat.class,
         inputPath);
-    DataSourceDescriptor dataSource = configurer.generateSplitsInAM(false).create();
+    DataSourceDescriptor dataSource = configurer.generateSplitsInAM(false).build();
 
     Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
         TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
@@ -186,23 +186,23 @@ public class UnionExample {
         UnionProcessor.class.getName()), 1);
 
     Configuration outputConf = new Configuration(tezConf);
-    DataSinkDescriptor od = MROutput.createConfigurer(outputConf,
-        TextOutputFormat.class, outputPath).create();
+    DataSinkDescriptor od = MROutput.createConfigBuilder(outputConf,
+        TextOutputFormat.class, outputPath).build();
     checkerVertex.addDataSink("union", od);
     
 
     Configuration allPartsConf = new Configuration(tezConf);
-    DataSinkDescriptor od2 = MROutput.createConfigurer(allPartsConf,
-        TextOutputFormat.class, outputPath + "-all-parts").create();
+    DataSinkDescriptor od2 = MROutput.createConfigBuilder(allPartsConf,
+        TextOutputFormat.class, outputPath + "-all-parts").build();
     checkerVertex.addDataSink("all-parts", od2);
 
     Configuration partsConf = new Configuration(tezConf);    
-    DataSinkDescriptor od1 = MROutput.createConfigurer(partsConf,
-        TextOutputFormat.class, outputPath + "-parts").create();
+    DataSinkDescriptor od1 = MROutput.createConfigBuilder(partsConf,
+        TextOutputFormat.class, outputPath + "-parts").build();
     VertexGroup unionVertex = dag.createVertexGroup("union", mapVertex1, mapVertex2);
     unionVertex.addDataSink("parts", od1);
 
-    OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
+    OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig
         .newBuilder(Text.class.getName(), IntWritable.class.getName(),
             HashPartitioner.class.getName()).build();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index fe489ff..4f3eb26 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -672,11 +672,11 @@ public class TestMRRJobsDAGApi {
           .configureMRInputWithLegacySplitGeneration(stage1Conf, remoteStagingDir, true);
     } else {
       if (initializerClass == null) {
-        dsd = MRInputLegacy.createConfigurer(stage1Conf, SleepInputFormat.class).create();
+        dsd = MRInputLegacy.createConfigBuilder(stage1Conf, SleepInputFormat.class).build();
       } else {
         InputInitializerDescriptor iid = new InputInitializerDescriptor(inputInitializerClazz.getName());
-        dsd = MRInputLegacy.createConfigurer(stage1Conf, SleepInputFormat.class)
-            .setCustomInitializerDescriptor(iid).create();
+        dsd = MRInputLegacy.createConfigBuilder(stage1Conf, SleepInputFormat.class)
+            .setCustomInitializerDescriptor(iid).build();
       }
     }
 
@@ -691,7 +691,7 @@ public class TestMRRJobsDAGApi {
         ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
         1, Resource.newInstance(256, 1));
     stage3Vertex.addDataSink("MROutput",
-        MROutputLegacy.createConfigurer(stage3Conf, NullOutputFormat.class).create());
+        MROutputLegacy.createConfigBuilder(stage3Conf, NullOutputFormat.class).build());
 
     // TODO env, resources
 


Mime
View raw message