tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [4/6] TEZ-1417. Rename *Configurer to ConfigBuilder/Config. (sseth)
Date Tue, 19 Aug 2014 00:02:10 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
deleted file mode 100644
index dd73c29..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
+++ /dev/null
@@ -1,441 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Map;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
-
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-/**
- * Configure {@link org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput} </p>
- * 
- * Values will be picked up from tez-site if not specified, otherwise defaults from
- * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
- */
-public class OrderedPartitionedKVOutputConfigurer {
-
-  /**
-   * Configure parameters which are specific to the Output.
-   */
-  @InterfaceAudience.Private
-  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
-    /**
-     * Set the buffer size to use when sort the output
-     *
-     * @param sortBufferSize the size of the buffer in MB
-     * @return instance of the current builder
-     */
-    public T setSortBufferSize(int sortBufferSize);
-
-
-    /**
-     * Configure the combiner class
-     *
-     * @param combinerClassName the combiner class name
-     * @return instance of the current builder
-     */
-    public T setCombiner(String combinerClassName);
-
-    /**
-     * Configure the combiner class and it's associated configuration (specified as key-value
-     * pairs). This method should only be used if the combiner requires some specific configuration.
-     * {@link #setCombiner(String)} is the preferred method for setting a combiner.
-     *
-     * @param combinerClassName the combiner class name
-     * @param combinerConf      the combiner configuration. This can be null, and otherwise
-     *                          is a {@link java.util.Map} of key-value pairs. The keys should
-     *                          be limited to the ones required by the combiner.
-     * @return instance of the current builder
-     */
-    public T setCombiner(String combinerClassName, @Nullable Map<String, String> combinerConf);
-
-
-
-    /**
-     * Configure the number of threads to be used by the sorter
-     *
-     * @param numThreads the number of threads
-     * @return instance of the current builder
-     */
-    public T setSorterNumThreads(int numThreads);
-  }
-
-  @SuppressWarnings("rawtypes")
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
-      SpecificConfigurer<SpecificBuilder> {
-
-    private final E edgeBuilder;
-    private final Builder builder;
-
-    SpecificBuilder(E edgeBuilder, Builder builder) {
-      this.edgeBuilder = edgeBuilder;
-      this.builder = builder;
-    }
-
-    @Override
-    public SpecificBuilder<E> setSortBufferSize(int sortBufferSize) {
-      builder.setSortBufferSize(sortBufferSize);
-      return this;
-    }
-
-    public SpecificBuilder<E> setCombiner(String combinerClassName) {
-      return this.setCombiner(combinerClassName, null);
-    }
-
-    @Override
-    public SpecificBuilder<E> setCombiner(String combinerClassName, Map<String, String> combinerConf) {
-      builder.setCombiner(combinerClassName, combinerConf);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setSorterNumThreads(int numThreads) {
-      builder.setSorterNumThreads(numThreads);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
-      builder.setAdditionalConfiguration(key, value);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
-      builder.setAdditionalConfiguration(confMap);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setFromConfiguration(Configuration conf) {
-      builder.setFromConfiguration(conf);
-      return this;
-    }
-
-    public E done() {
-      return edgeBuilder;
-    }
-  }
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  Configuration conf;
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  OrderedPartitionedKVOutputConfigurer() {
-  }
-
-  private OrderedPartitionedKVOutputConfigurer(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * Get a UserPayload representation of the Configuration
-   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
-   */
-  public UserPayload toUserPayload() {
-    try {
-      return TezUtils.createUserPayloadFromConf(conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @InterfaceAudience.Private
-  public void fromUserPayload(UserPayload payload) {
-    try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static Builder newBuilder(String keyClass, String valueClass, String partitionerClassName) {
-    return newBuilder(keyClass, valueClass, partitionerClassName, null);
-  }
-
-  public static Builder newBuilder(String keyClass, String valueClass, String partitionerClassName,
-                                   Map<String, String> partitionerConf) {
-    return new Builder(keyClass, valueClass, partitionerClassName, partitionerConf);
-  }
-
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class Builder implements SpecificConfigurer<Builder> {
-
-    private final Configuration conf = new Configuration(false);
-
-    /**
-     * Create a configuration builder for {@link org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput}
-     *
-     * @param keyClassName         the key class name
-     * @param valueClassName       the value class name
-     * @param partitionerClassName the partitioner class name
-     * @param partitionerConf      the partitioner configuration. This can be null, and is a {@link
-     *                             java.util.Map} of key-value pairs. The keys should be limited to
-     *                             the ones required by the partitioner.
-     */
-    @InterfaceAudience.Private
-    Builder(String keyClassName, String valueClassName, String partitionerClassName,
-                   @Nullable Map<String, String> partitionerConf) {
-      this();
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      Preconditions.checkNotNull(partitionerClassName, "Partitioner class name cannot be null");
-      setKeyClassName(keyClassName);
-      setValueClassName(valueClassName);
-      setPartitioner(partitionerClassName, partitionerConf);
-    }
-
-    @InterfaceAudience.Private
-    Builder() {
-      Map<String, String> tezDefaults = ConfigUtils
-          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
-              OrderedPartitionedKVOutput.getConfigurationKeySet());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
-      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
-    }
-
-    @InterfaceAudience.Private
-    Builder setKeyClassName(String keyClassName) {
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
-      return this;
-    }
-
-    @InterfaceAudience.Private
-    Builder setValueClassName(String valueClassName) {
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
-      return this;
-    }
-
-    @InterfaceAudience.Private
-    Builder setPartitioner(String partitionerClassName, @Nullable Map<String, String> partitionerConf) {
-      Preconditions.checkNotNull(partitionerClassName, "Partitioner class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, partitionerClassName);
-      if (partitionerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, partitionerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setSortBufferSize(int sortBufferSize) {
-      this.conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, sortBufferSize);
-      return this;
-    }
-
-    @Override
-    public Builder setCombiner(String combinerClassName) {
-      return this.setCombiner(combinerClassName, null);
-    }
-
-    @Override
-    public Builder setCombiner(String combinerClassName, Map<String, String> combinerConf) {
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, combinerClassName);
-      if (combinerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, combinerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setSorterNumThreads(int numThreads) {
-      this.conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, numThreads);
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(String key, String value) {
-      Preconditions.checkNotNull(key, "Key cannot be null");
-      if (ConfigUtils.doesKeyQualify(key,
-          Lists.newArrayList(OrderedPartitionedKVOutput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
-          TezRuntimeConfiguration.getAllowedPrefixes())) {
-        if (value == null) {
-          this.conf.unset(key);
-        } else {
-          this.conf.set(key, value);
-        }
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
-      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
-          Lists.newArrayList(OrderedPartitionedKVOutput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    @Override
-    public Builder setFromConfiguration(Configuration conf) {
-      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
-      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
-          Lists.newArrayList(OrderedPartitionedKVOutput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    /**
-     * Set the key comparator class
-     *
-     * @param comparatorClassName the key comparator class name
-     * @return instance of the current builder
-     */
-    public Builder setKeyComparatorClass(String comparatorClassName) {
-      return this.setKeyComparatorClass(comparatorClassName, null);
-    }
-
-    /**
-     * Set the key comparator class and it's associated configuration. This method should only be
-     * used if the comparator requires some specific configuration, which is typically not the
-     * case. {@link #setKeyComparatorClass(String)} is the preferred method for setting a
-     * comparator.
-     *
-     * @param comparatorClassName the key comparator class name
-     * @param comparatorConf      the comparator configuration. This can be null, and is a {@link
-     *                            java.util.Map} of key-value pairs. The keys should be limited to
-     *                            the ones required by the comparator.
-     * @return instance of the current builder
-     */
-    public Builder setKeyComparatorClass(String comparatorClassName,
-                                         @Nullable Map<String, String> comparatorConf) {
-      Preconditions.checkNotNull(comparatorClassName, "Comparator class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
-          comparatorClassName);
-      if (comparatorConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, comparatorConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
-                                  @Nullable Map<String, String> codecConf) {
-      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
-      if (enabled && compressionCodec != null) {
-        this.conf
-            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
-      }
-      if (codecConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    /**
-     * Set serialization class and the relevant comparator to be used for sorting.
-     * Providing custom serialization class could change the way, keys needs to be compared in
-     * sorting. Providing invalid comparator here could create invalid results.
-     *
-     * @param serializationClassName
-     * @param comparatorClassName
-     * @param serializerConf         the serializer configuration. This can be null, and is a
-     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
-     *                               to the ones required by the comparator.
-     * @return
-     */
-    public Builder setKeySerializationClass(String serializationClassName,
-        String comparatorClassName, @Nullable Map<String, String> serializerConf) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      Preconditions.checkArgument(comparatorClassName != null,
-          "comparator cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      setKeyComparatorClass(comparatorClassName, null);
-      if (serializerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for values.
-     *
-     * @param serializationClassName
-     * @param serializerConf         the serializer configuration. This can be null, and is a
-     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
-     *                               to the ones required by the comparator.
-     * @return
-     */
-    public Builder setValueSerializationClass(String serializationClassName,
-                                              @Nullable Map<String, String> serializerConf) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      if (serializerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    /**
-     * Create the actual configuration instance.
-     *
-     * @return an instance of the Configuration
-     */
-    public OrderedPartitionedKVOutputConfigurer build() {
-      return new OrderedPartitionedKVOutputConfigurer(this.conf);
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
new file mode 100644
index 0000000..47c0c76
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
@@ -0,0 +1,260 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.library.input.UnorderedKVInput;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+
+/**
+ * Configure payloads for the UnorderedKVOutput and UnorderedKVInput pair </p>
+ *
+ * Values will be picked up from tez-site if not specified, otherwise defaults from
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class UnorderedKVEdgeConfig extends HadoopKeyValuesBasedBaseEdgeConfig {
+  private final UnorderedKVOutputConfig outputConf;
+  private final UnorderedKVInputConfig inputConf;
+
+  private UnorderedKVEdgeConfig(
+      UnorderedKVOutputConfig outputConfiguration,
+      UnorderedKVInputConfig inputConfiguration) {
+    this.outputConf = outputConfiguration;
+    this.inputConf = inputConfiguration;
+
+  }
+
+  /**
+   * Create a builder to configure the relevant Input and Output
+   * @param keyClassName the key class name
+   * @param valueClassName the value class name
+   * @return a builder to configure the edge
+   */
+  public static Builder newBuilder(String keyClassName, String valueClassName) {
+    return new Builder(keyClassName, valueClassName);
+  }
+
+  @Override
+  public UserPayload getOutputPayload() {
+    return outputConf.toUserPayload();
+  }
+
+  @Override
+  public String getOutputClassName() {
+    return UnorderedKVOutput.class.getName();
+  }
+
+  @Override
+  public UserPayload getInputPayload() {
+    return inputConf.toUserPayload();
+  }
+
+  @Override
+  public String getInputClassName() {
+    return UnorderedKVInput.class.getName();
+  }
+
+  /**
+   * This is a convenience method for the typical usage of this edge, and creates an instance of
+   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
+   * If custom edge properties are required, the methods to get the relevant payloads should be
+   * used. </p>
+   * * In this case - DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED,
+   * EdgeProperty.SchedulingType.SEQUENTIAL
+   *
+   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
+   */
+  public EdgeProperty createDefaultBroadcastEdgeProperty() {
+    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.BROADCAST,
+        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
+        new OutputDescriptor(
+            getOutputClassName()).setUserPayload(getOutputPayload()),
+        new InputDescriptor(
+            getInputClassName()).setUserPayload(getInputPayload()));
+    return edgeProperty;
+  }
+
+  /**
+   * This is a convenience method for the typical usage of this edge, and creates an instance of
+   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
+   * If custom edge properties are required, the methods to get the relevant payloads should be
+   * used. </p>
+   * * In this case - DataMovementType.ONE_TO_ONE, EdgeProperty.DataSourceType.PERSISTED,
+   * EdgeProperty.SchedulingType.SEQUENTIAL
+   *
+   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
+   */
+  public EdgeProperty createDefaultOneToOneEdgeProperty() {
+    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.ONE_TO_ONE,
+        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
+        new OutputDescriptor(
+            getOutputClassName()).setUserPayload(getOutputPayload()),
+        new InputDescriptor(
+            getInputClassName()).setUserPayload(getInputPayload()));
+    return edgeProperty;
+  }
+
+  /**
+   * This is a convenience method for creating an Edge descriptor based on the specified
+   * EdgeManagerDescriptor.
+   *
+   * @param edgeManagerDescriptor the custom edge specification
+   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
+   */
+  public EdgeProperty createDefaultCustomEdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor) {
+    Preconditions.checkNotNull(edgeManagerDescriptor, "EdgeManagerDescriptor cannot be null");
+    EdgeProperty edgeProperty =
+        new EdgeProperty(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
+            EdgeProperty.SchedulingType.SEQUENTIAL,
+            new OutputDescriptor(getOutputClassName()).setUserPayload(getOutputPayload()),
+            new InputDescriptor(getInputClassName()).setUserPayload(getInputPayload()));
+    return edgeProperty;
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfig.Builder<Builder> {
+
+    private final UnorderedKVOutputConfig.Builder outputBuilder =
+        new UnorderedKVOutputConfig.Builder();
+    private final UnorderedKVOutputConfig.SpecificBuilder<UnorderedKVEdgeConfig.Builder>
+        specificOutputBuilder =
+        new UnorderedKVOutputConfig.SpecificBuilder<UnorderedKVEdgeConfig.Builder>(
+            this, outputBuilder);
+
+    private final UnorderedKVInputConfig.Builder inputBuilder =
+        new UnorderedKVInputConfig.Builder();
+    private final UnorderedKVInputConfig.SpecificBuilder<UnorderedKVEdgeConfig.Builder>
+        specificInputBuilder =
+        new UnorderedKVInputConfig.SpecificBuilder<UnorderedKVEdgeConfig.Builder>(
+            this, inputBuilder);
+
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName) {
+      outputBuilder.setKeyClassName(keyClassName);
+      outputBuilder.setValueClassName(valueClassName);
+      inputBuilder.setKeyClassName(keyClassName);
+      inputBuilder.setValueClassName(valueClassName);
+    }
+
+    @Override
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
+      outputBuilder.setCompression(enabled, compressionCodec, codecConf);
+      inputBuilder.setCompression(enabled, compressionCodec, codecConf);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      outputBuilder.setAdditionalConfiguration(key, value);
+      inputBuilder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      outputBuilder.setAdditionalConfiguration(confMap);
+      inputBuilder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      outputBuilder.setFromConfiguration(conf);
+      inputBuilder.setFromConfiguration(conf);
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for key/value and
+     * the corresponding comparator class to be used as key comparator.
+     *
+     * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName,
+                                            @Nullable Map<String, String> serializerConf) {
+      outputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
+      inputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
+      outputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
+      inputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
+      return this;
+    }
+
+    /**
+     * Configure the specific output
+     * @return a builder to configure the output
+     */
+    public UnorderedKVOutputConfig.SpecificBuilder<Builder> configureOutput() {
+      return specificOutputBuilder;
+    }
+
+    /**
+     * Configure the specific input
+     * @return a builder to configure the input
+     */
+    public UnorderedKVInputConfig.SpecificBuilder<Builder> configureInput() {
+      return specificInputBuilder;
+    }
+
+    /**
+     * Build and return an instance of the configuration
+     * @return an instance of the acatual configuration
+     */
+    public UnorderedKVEdgeConfig build() {
+      return new UnorderedKVEdgeConfig(outputBuilder.build(), inputBuilder.build());
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
deleted file mode 100644
index e96e0ef..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import javax.annotation.Nullable;
-
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.input.UnorderedKVInput;
-import org.apache.tez.runtime.library.output.UnorderedKVOutput;
-
-/**
- * Configure payloads for the UnorderedKVOutput and UnorderedKVInput pair </p>
- *
- * Values will be picked up from tez-site if not specified, otherwise defaults from
- * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class UnorderedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfigurer {
-  private final UnorderedKVOutputConfigurer outputConf;
-  private final UnorderedKVInputConfigurer inputConf;
-
-  private UnorderedKVEdgeConfigurer(
-      UnorderedKVOutputConfigurer outputConfiguration,
-      UnorderedKVInputConfigurer inputConfiguration) {
-    this.outputConf = outputConfiguration;
-    this.inputConf = inputConfiguration;
-
-  }
-
-  /**
-   * Create a builder to configure the relevant Input and Output
-   * @param keyClassName the key class name
-   * @param valueClassName the value class name
-   * @return a builder to configure the edge
-   */
-  public static Builder newBuilder(String keyClassName, String valueClassName) {
-    return new Builder(keyClassName, valueClassName);
-  }
-
-  @Override
-  public UserPayload getOutputPayload() {
-    return outputConf.toUserPayload();
-  }
-
-  @Override
-  public String getOutputClassName() {
-    return UnorderedKVOutput.class.getName();
-  }
-
-  @Override
-  public UserPayload getInputPayload() {
-    return inputConf.toUserPayload();
-  }
-
-  @Override
-  public String getInputClassName() {
-    return UnorderedKVInput.class.getName();
-  }
-
-  /**
-   * This is a convenience method for the typical usage of this edge, and creates an instance of
-   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
-   * If custom edge properties are required, the methods to get the relevant payloads should be
-   * used. </p>
-   * * In this case - DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED,
-   * EdgeProperty.SchedulingType.SEQUENTIAL
-   *
-   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
-   */
-  public EdgeProperty createDefaultBroadcastEdgeProperty() {
-    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.BROADCAST,
-        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
-        new OutputDescriptor(
-            getOutputClassName()).setUserPayload(getOutputPayload()),
-        new InputDescriptor(
-            getInputClassName()).setUserPayload(getInputPayload()));
-    return edgeProperty;
-  }
-
-  /**
-   * This is a convenience method for the typical usage of this edge, and creates an instance of
-   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
-   * If custom edge properties are required, the methods to get the relevant payloads should be
-   * used. </p>
-   * * In this case - DataMovementType.ONE_TO_ONE, EdgeProperty.DataSourceType.PERSISTED,
-   * EdgeProperty.SchedulingType.SEQUENTIAL
-   *
-   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
-   */
-  public EdgeProperty createDefaultOneToOneEdgeProperty() {
-    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.ONE_TO_ONE,
-        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
-        new OutputDescriptor(
-            getOutputClassName()).setUserPayload(getOutputPayload()),
-        new InputDescriptor(
-            getInputClassName()).setUserPayload(getInputPayload()));
-    return edgeProperty;
-  }
-
-  /**
-   * This is a convenience method for creating an Edge descriptor based on the specified
-   * EdgeManagerDescriptor.
-   *
-   * @param edgeManagerDescriptor the custom edge specification
-   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
-   */
-  public EdgeProperty createDefaultCustomEdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor) {
-    Preconditions.checkNotNull(edgeManagerDescriptor, "EdgeManagerDescriptor cannot be null");
-    EdgeProperty edgeProperty =
-        new EdgeProperty(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
-            EdgeProperty.SchedulingType.SEQUENTIAL,
-            new OutputDescriptor(getOutputClassName()).setUserPayload(getOutputPayload()),
-            new InputDescriptor(getInputClassName()).setUserPayload(getInputPayload()));
-    return edgeProperty;
-  }
-
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder<Builder> {
-
-    private final UnorderedKVOutputConfigurer.Builder outputBuilder =
-        new UnorderedKVOutputConfigurer.Builder();
-    private final UnorderedKVOutputConfigurer.SpecificBuilder<UnorderedKVEdgeConfigurer.Builder>
-        specificOutputBuilder =
-        new UnorderedKVOutputConfigurer.SpecificBuilder<UnorderedKVEdgeConfigurer.Builder>(
-            this, outputBuilder);
-
-    private final UnorderedKVInputConfigurer.Builder inputBuilder =
-        new UnorderedKVInputConfigurer.Builder();
-    private final UnorderedKVInputConfigurer.SpecificBuilder<UnorderedKVEdgeConfigurer.Builder>
-        specificInputBuilder =
-        new UnorderedKVInputConfigurer.SpecificBuilder<UnorderedKVEdgeConfigurer.Builder>(
-            this, inputBuilder);
-
-    @InterfaceAudience.Private
-    Builder(String keyClassName, String valueClassName) {
-      outputBuilder.setKeyClassName(keyClassName);
-      outputBuilder.setValueClassName(valueClassName);
-      inputBuilder.setKeyClassName(keyClassName);
-      inputBuilder.setValueClassName(valueClassName);
-    }
-
-    @Override
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
-                                  @Nullable Map<String, String> codecConf) {
-      outputBuilder.setCompression(enabled, compressionCodec, codecConf);
-      inputBuilder.setCompression(enabled, compressionCodec, codecConf);
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(String key, String value) {
-      outputBuilder.setAdditionalConfiguration(key, value);
-      inputBuilder.setAdditionalConfiguration(key, value);
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
-      outputBuilder.setAdditionalConfiguration(confMap);
-      inputBuilder.setAdditionalConfiguration(confMap);
-      return this;
-    }
-
-    @Override
-    public Builder setFromConfiguration(Configuration conf) {
-      outputBuilder.setFromConfiguration(conf);
-      inputBuilder.setFromConfiguration(conf);
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for key/value and
-     * the corresponding comparator class to be used as key comparator.
-     *
-     * @param serializationClassName
-     * @param serializerConf         the serializer configuration. This can be null, and is a
-     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
-     *                               to the ones required by the comparator.
-     * @return
-     */
-    public Builder setKeySerializationClass(String serializationClassName,
-                                            @Nullable Map<String, String> serializerConf) {
-      outputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
-      inputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for values.
-     *
-     * @param serializationClassName
-     * @param serializerConf         the serializer configuration. This can be null, and is a
-     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
-     *                               to the ones required by the comparator.
-     * @return
-     */
-    public Builder setValueSerializationClass(String serializationClassName,
-                                              @Nullable Map<String, String> serializerConf) {
-      outputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
-      inputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
-      return this;
-    }
-
-    /**
-     * Configure the specific output
-     * @return a builder to configure the output
-     */
-    public UnorderedKVOutputConfigurer.SpecificBuilder<Builder> configureOutput() {
-      return specificOutputBuilder;
-    }
-
-    /**
-     * Configure the specific input
-     * @return a builder to configure the input
-     */
-    public UnorderedKVInputConfigurer.SpecificBuilder<Builder> configureInput() {
-      return specificInputBuilder;
-    }
-
-    /**
-     * Build and return an instance of the configuration
-     * @return an instance of the acatual configuration
-     */
-    public UnorderedKVEdgeConfigurer build() {
-      return new UnorderedKVEdgeConfigurer(outputBuilder.build(), inputBuilder.build());
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java
new file mode 100644
index 0000000..d68e140
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java
@@ -0,0 +1,358 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.input.UnorderedKVInput;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+/**
+ * Configure {@link org.apache.tez.runtime.library.input.UnorderedKVInput} </p>
+ *
+ * Values will be picked up from tez-site if not specified, otherwise defaults from
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
+ */
+public class UnorderedKVInputConfig {
+
+  /**
+   * Configure parameters which are specific to the Input.
+   */
+  @InterfaceAudience.Private
+  public static interface SpecificConfigBuilder<T> extends BaseConfigBuilder<T> {
+
+    /**
+     * Sets the buffer fraction, as a fraction of container size, to be used while fetching remote
+     * data.
+     *
+     * @param shuffleBufferFraction fraction of container size
+     * @return instance of the current builder
+     */
+    public T setShuffleBufferFraction(float shuffleBufferFraction);
+
+    /**
+     * Sets a size limit on the maximum segment size to be shuffled to disk. This is a fraction of
+     * the shuffle buffer.
+     *
+     * @param maxSingleSegmentFraction fraction of memory determined by ShuffleBufferFraction
+     * @return instance of the current builder
+     */
+    public T setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction);
+
+    /**
+     * Configure the point at which in memory segments will be merged and written out to a single
+     * large disk segment. This is specified as a
+     * fraction of the shuffle buffer. </p> Has no affect at the moment.
+     *
+     * @param mergeFraction fraction of memory determined by ShuffleBufferFraction, which when
+     *                      filled, will
+     *                      trigger a merge
+     * @return instance of the current builder
+     */
+    public T setMergeFraction(float mergeFraction);
+
+  }
+
+  @SuppressWarnings("rawtypes")
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfig.Builder> implements
+      SpecificConfigBuilder<SpecificBuilder> {
+
+    private final E edgeBuilder;
+    private final UnorderedKVInputConfig.Builder builder;
+
+
+    @InterfaceAudience.Private
+    SpecificBuilder(E edgeBuilder, UnorderedKVInputConfig.Builder builder) {
+      this.edgeBuilder = edgeBuilder;
+      this.builder = builder;
+    }
+
+    @Override
+    public SpecificBuilder<E> setShuffleBufferFraction(float shuffleBufferFraction) {
+      builder.setShuffleBufferFraction(shuffleBufferFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
+      builder.setMaxSingleMemorySegmentFraction(maxSingleSegmentFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMergeFraction(float mergeFraction) {
+      builder.setMergeFraction(mergeFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
+      builder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
+      builder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setFromConfiguration(Configuration conf) {
+      builder.setFromConfiguration(conf);
+      return this;
+    }
+
+    public E done() {
+      return edgeBuilder;
+    }
+
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  Configuration conf;
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  UnorderedKVInputConfig() {
+  }
+
+  private UnorderedKVInputConfig(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get a UserPayload representation of the Configuration
+   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
+   */
+  public UserPayload toUserPayload() {
+    try {
+      return TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public void fromUserPayload(UserPayload payload) {
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(payload);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Builder newBuilder(String keyClass, String valueClass) {
+    return new Builder(keyClass, valueClass);
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder implements SpecificConfigBuilder<Builder> {
+
+    private final Configuration conf = new Configuration(false);
+
+    /**
+     * Create a configuration builder for {@link org.apache.tez.runtime.library.input.UnorderedKVInput}
+     *
+     * @param keyClassName         the key class name
+     * @param valueClassName       the value class name
+     */
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName) {
+      this();
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      setKeyClassName(keyClassName);
+      setValueClassName(valueClassName);
+    }
+
+    @InterfaceAudience.Private
+    Builder() {
+      Map<String, String> tezDefaults = ConfigUtils
+          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
+              UnorderedKVInput.getConfigurationKeySet());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
+      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
+    }
+
+    @InterfaceAudience.Private
+    Builder setKeyClassName(String keyClassName) {
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setValueClassName(String valueClassName) {
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
+      return this;
+    }
+
+    @Override
+    public Builder setShuffleBufferFraction(float shuffleBufferFraction) {
+      this.conf
+          .setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, shuffleBufferFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
+      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+          maxSingleSegmentFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setMergeFraction(float mergeFraction) {
+      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, mergeFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      Preconditions.checkNotNull(key, "Key cannot be null");
+      if (ConfigUtils.doesKeyQualify(key,
+          Lists.newArrayList(UnorderedKVInput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
+          TezRuntimeConfiguration.getAllowedPrefixes())) {
+        if (value == null) {
+          this.conf.unset(key);
+        } else {
+          this.conf.set(key, value);
+        }
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
+          Lists.newArrayList(UnorderedKVInput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
+      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
+          Lists.newArrayList(UnorderedKVInput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
+      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
+      if (enabled && compressionCodec != null) {
+        this.conf
+            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
+      }
+      if (codecConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for key/value and
+     * the corresponding comparator class to be used as key comparator.
+     *
+     * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName,
+                                            @Nullable Map<String, String> serializerConf) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Create the actual configuration instance.
+     *
+     * @return an instance of the Configuration
+     */
+    public UnorderedKVInputConfig build() {
+      return new UnorderedKVInputConfig(this.conf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
deleted file mode 100644
index 8771c97..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Map;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.input.UnorderedKVInput;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-/**
- * Configure {@link org.apache.tez.runtime.library.input.UnorderedKVInput} </p>
- *
- * Values will be picked up from tez-site if not specified, otherwise defaults from
- * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
- */
-public class UnorderedKVInputConfigurer {
-
-  /**
-   * Configure parameters which are specific to the Input.
-   */
-  @InterfaceAudience.Private
-  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
-
-    /**
-     * Sets the buffer fraction, as a fraction of container size, to be used while fetching remote
-     * data.
-     *
-     * @param shuffleBufferFraction fraction of container size
-     * @return instance of the current builder
-     */
-    public T setShuffleBufferFraction(float shuffleBufferFraction);
-
-    /**
-     * Sets a size limit on the maximum segment size to be shuffled to disk. This is a fraction of
-     * the shuffle buffer.
-     *
-     * @param maxSingleSegmentFraction fraction of memory determined by ShuffleBufferFraction
-     * @return instance of the current builder
-     */
-    public T setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction);
-
-    /**
-     * Configure the point at which in memory segments will be merged and written out to a single
-     * large disk segment. This is specified as a
-     * fraction of the shuffle buffer. </p> Has no affect at the moment.
-     *
-     * @param mergeFraction fraction of memory determined by ShuffleBufferFraction, which when
-     *                      filled, will
-     *                      trigger a merge
-     * @return instance of the current builder
-     */
-    public T setMergeFraction(float mergeFraction);
-
-  }
-
-  @SuppressWarnings("rawtypes")
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
-      SpecificConfigurer<SpecificBuilder> {
-
-    private final E edgeBuilder;
-    private final UnorderedKVInputConfigurer.Builder builder;
-
-
-    @InterfaceAudience.Private
-    SpecificBuilder(E edgeBuilder, UnorderedKVInputConfigurer.Builder builder) {
-      this.edgeBuilder = edgeBuilder;
-      this.builder = builder;
-    }
-
-    @Override
-    public SpecificBuilder<E> setShuffleBufferFraction(float shuffleBufferFraction) {
-      builder.setShuffleBufferFraction(shuffleBufferFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
-      builder.setMaxSingleMemorySegmentFraction(maxSingleSegmentFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setMergeFraction(float mergeFraction) {
-      builder.setMergeFraction(mergeFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
-      builder.setAdditionalConfiguration(key, value);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
-      builder.setAdditionalConfiguration(confMap);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setFromConfiguration(Configuration conf) {
-      builder.setFromConfiguration(conf);
-      return this;
-    }
-
-    public E done() {
-      return edgeBuilder;
-    }
-
-  }
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  Configuration conf;
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  UnorderedKVInputConfigurer() {
-  }
-
-  private UnorderedKVInputConfigurer(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * Get a UserPayload representation of the Configuration
-   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
-   */
-  public UserPayload toUserPayload() {
-    try {
-      return TezUtils.createUserPayloadFromConf(conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @InterfaceAudience.Private
-  public void fromUserPayload(UserPayload payload) {
-    try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static Builder newBuilder(String keyClass, String valueClass) {
-    return new Builder(keyClass, valueClass);
-  }
-
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class Builder implements SpecificConfigurer<Builder> {
-
-    private final Configuration conf = new Configuration(false);
-
-    /**
-     * Create a configuration builder for {@link org.apache.tez.runtime.library.input.UnorderedKVInput}
-     *
-     * @param keyClassName         the key class name
-     * @param valueClassName       the value class name
-     */
-    @InterfaceAudience.Private
-    Builder(String keyClassName, String valueClassName) {
-      this();
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      setKeyClassName(keyClassName);
-      setValueClassName(valueClassName);
-    }
-
-    @InterfaceAudience.Private
-    Builder() {
-      Map<String, String> tezDefaults = ConfigUtils
-          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
-              UnorderedKVInput.getConfigurationKeySet());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
-      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
-    }
-
-    @InterfaceAudience.Private
-    Builder setKeyClassName(String keyClassName) {
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
-      return this;
-    }
-
-    @InterfaceAudience.Private
-    Builder setValueClassName(String valueClassName) {
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
-      return this;
-    }
-
-    @Override
-    public Builder setShuffleBufferFraction(float shuffleBufferFraction) {
-      this.conf
-          .setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, shuffleBufferFraction);
-      return this;
-    }
-
-    @Override
-    public Builder setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
-      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
-          maxSingleSegmentFraction);
-      return this;
-    }
-
-    @Override
-    public Builder setMergeFraction(float mergeFraction) {
-      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, mergeFraction);
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(String key, String value) {
-      Preconditions.checkNotNull(key, "Key cannot be null");
-      if (ConfigUtils.doesKeyQualify(key,
-          Lists.newArrayList(UnorderedKVInput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
-          TezRuntimeConfiguration.getAllowedPrefixes())) {
-        if (value == null) {
-          this.conf.unset(key);
-        } else {
-          this.conf.set(key, value);
-        }
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
-      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
-          Lists.newArrayList(UnorderedKVInput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    @Override
-    public Builder setFromConfiguration(Configuration conf) {
-      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
-      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
-          Lists.newArrayList(UnorderedKVInput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
-                                  @Nullable Map<String, String> codecConf) {
-      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
-      if (enabled && compressionCodec != null) {
-        this.conf
-            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
-      }
-      if (codecConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for key/value and
-     * the corresponding comparator class to be used as key comparator.
-     *
-     * @param serializationClassName
-     * @param serializerConf         the serializer configuration. This can be null, and is a
-     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
-     *                               to the ones required by the comparator.
-     * @return
-     */
-    public Builder setKeySerializationClass(String serializationClassName,
-                                            @Nullable Map<String, String> serializerConf) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      if (serializerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for values.
-     *
-     * @param serializationClassName
-     * @param serializerConf         the serializer configuration. This can be null, and is a
-     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
-     *                               to the ones required by the comparator.
-     * @return
-     */
-    public Builder setValueSerializationClass(String serializationClassName,
-                                              @Nullable Map<String, String> serializerConf) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      if (serializerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    /**
-     * Create the actual configuration instance.
-     *
-     * @return an instance of the Configuration
-     */
-    public UnorderedKVInputConfigurer build() {
-      return new UnorderedKVInputConfigurer(this.conf);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java
new file mode 100644
index 0000000..b4e2c2c
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java
@@ -0,0 +1,283 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+/**
+ * Configure {@link org.apache.tez.runtime.library.output.UnorderedKVOutput} </p>
+ *
+ * Values will be picked up from tez-site if not specified, otherwise defaults from
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
+ */
+public class UnorderedKVOutputConfig {
+  /**
+   * Configure parameters which are specific to the Output.
+   */
+  @InterfaceAudience.Private
+  public static interface SpecificConfigBuilder<T> extends BaseConfigBuilder<T> {
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfig.Builder> implements
+      SpecificConfigBuilder<SpecificBuilder> {
+
+    private final E edgeBuilder;
+    private final Builder builder;
+
+    SpecificBuilder(E edgeBuilder, Builder builder) {
+      this.edgeBuilder = edgeBuilder;
+      this.builder = builder;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
+      builder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
+      builder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setFromConfiguration(Configuration conf) {
+      builder.setFromConfiguration(conf);
+      return this;
+    }
+
+    public E done() {
+      return edgeBuilder;
+    }
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  Configuration conf;
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  UnorderedKVOutputConfig() {
+  }
+
+  private UnorderedKVOutputConfig(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get a UserPayload representation of the Configuration
+   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
+   */
+  public UserPayload toUserPayload() {
+    try {
+      return TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public void fromUserPayload(UserPayload payload) {
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(payload);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Builder newBuilder(String keyClass, String valClass) {
+    return new Builder(keyClass, valClass);
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder implements SpecificConfigBuilder<Builder> {
+
+    private final Configuration conf = new Configuration(false);
+
+    /**
+     * Create a configuration builder for {@link org.apache.tez.runtime.library.output.UnorderedKVOutput}
+     *
+     * @param keyClassName         the key class name
+     * @param valueClassName       the value class name
+     */
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName) {
+      this();
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      setKeyClassName(keyClassName);
+      setValueClassName(valueClassName);
+    }
+
+    @InterfaceAudience.Private
+    Builder() {
+      Map<String, String> tezDefaults = ConfigUtils
+          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
+              UnorderedKVOutput.getConfigurationKeySet());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
+      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
+    }
+
+    @InterfaceAudience.Private
+    Builder setKeyClassName(String keyClassName) {
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setValueClassName(String valueClassName) {
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      Preconditions.checkNotNull(key, "Key cannot be null");
+      if (ConfigUtils.doesKeyQualify(key,
+          Lists.newArrayList(UnorderedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
+          TezRuntimeConfiguration.getAllowedPrefixes())) {
+        if (value == null) {
+          this.conf.unset(key);
+        } else {
+          this.conf.set(key, value);
+        }
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
+          Lists.newArrayList(UnorderedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
+      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
+          Lists.newArrayList(UnorderedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for keys.
+     *
+     * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName,
+                                            @Nullable Map<String, String> serializerConf) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
+      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
+      if (enabled && compressionCodec != null) {
+        this.conf
+            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
+      }
+      if (codecConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Create the actual configuration instance.
+     *
+     * @return an instance of the Configuration
+     */
+    public UnorderedKVOutputConfig build() {
+      return new UnorderedKVOutputConfig(this.conf);
+    }
+  }
+}


Mime
View raw message