tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [5/6] TEZ-1417. Rename *Configurer to ConfigBuilder/Config. (sseth)
Date Tue, 19 Aug 2014 00:02:11 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java
new file mode 100644
index 0000000..9d08239
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java
@@ -0,0 +1,515 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
+import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+/**
+ * Configure {@link org.apache.tez.runtime.library.input.OrderedGroupedKVInput} </p>
+ *
+ * Values will be picked up from tez-site if not specified, otherwise defaults from
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
+ */
+public class OrderedGroupedKVInputConfig {
+
+  /**
+   * Configure parameters which are specific to the Input.
+   */
+  @InterfaceAudience.Private
+  public static interface SpecificConfigBuilder<T> extends BaseConfigBuilder<T> {
+
+    /**
+     * Specifies whether the legacy version of this input should be used.
+     * @return instance of the current builder
+     */
+    public T useLegacyInput();
+    /**
+     * Sets the buffer fraction, as a fraction of container size, to be used while fetching remote
+     * data.
+     *
+     * @param shuffleBufferFraction fraction of container size
+     * @return instance of the current builder
+     */
+    public T setShuffleBufferFraction(float shuffleBufferFraction);
+
+    /**
+     * Sets the buffer fraction, as a fraction of container size, to be used after the fetch and
+     * merge are complete. This buffer is used to cache merged data and avoids writing it out to
+     * disk.
+     *
+     * @param postMergeBufferFraction fraction of container size
+     * @return instance of the current builder
+     */
+    public T setPostMergeBufferFraction(float postMergeBufferFraction);
+
+    /**
+     * Sets a size limit on the maximum segment size to be shuffled to disk. This is a fraction of
+     * the shuffle buffer.
+     *
+     * @param maxSingleSegmentFraction fraction of memory determined by ShuffleBufferFraction
+     * @return instance of the current builder
+     */
+    public T setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction);
+
+    /**
+     * Enable the memory to memory merger
+     *
+     * @param enable whether to enable the memory to memory merger
+     * @return
+     */
+    public T setMemToMemMerger(boolean enable); // Not super useful until additional params are used.
+
+    /**
+     * Configure the point at which in memory segments will be merged. This is specified as a
+     * fraction of the shuffle buffer.
+     *
+     * @param mergeFraction fraction of memory determined by ShuffleBufferFraction, which when
+     *                      filled, will
+     *                      trigger a merge
+     * @return instance of the current builder
+     */
+    public T setMergeFraction(float mergeFraction);
+
+    /**
+     * Configure the combiner class
+     *
+     * @param combinerClassName the combiner class name
+     * @return instance of the current builder
+     */
+    public T setCombiner(String combinerClassName);
+
+    /**
+     * Configure the combiner class and it's associated configuration (specified as key-value
+     * pairs). This method should only be used if the combiner requires some specific configuration.
+     * {@link #setCombiner(String)} is the preferred method for setting a combiner.
+     *
+     * @param combinerClassName the combiner class name
+     * @param combinerConf      the combiner configuration. This can be null, and otherwise
+     *                          is a {@link java.util.Map} of key-value pairs. The keys should
+     *                          be limited to the ones required by the combiner.
+     * @return instance of the current builder
+     */
+    public T setCombiner(String combinerClassName, @Nullable Map<String, String> combinerConf);
+
+  }
+
+  @SuppressWarnings("rawtypes")
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfig.Builder> implements
+      SpecificConfigBuilder<SpecificBuilder> {
+
+    private final E edgeBuilder;
+    private final OrderedGroupedKVInputConfig.Builder builder;
+
+
+    @InterfaceAudience.Private
+    SpecificBuilder(E edgeBuilder, OrderedGroupedKVInputConfig.Builder builder) {
+      this.edgeBuilder = edgeBuilder;
+      this.builder = builder;
+    }
+
+    public SpecificBuilder<E> useLegacyInput() {
+      builder.useLegacyInput();
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setShuffleBufferFraction(float shuffleBufferFraction) {
+      builder.setShuffleBufferFraction(shuffleBufferFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setPostMergeBufferFraction(float postMergeBufferFraction) {
+      builder.setPostMergeBufferFraction(postMergeBufferFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
+      builder.setMaxSingleMemorySegmentFraction(maxSingleSegmentFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMemToMemMerger(boolean enable) {
+      builder.setMemToMemMerger(enable);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMergeFraction(float mergeFraction) {
+      builder.setMergeFraction(mergeFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setCombiner(String combinerClassName) {
+      return setCombiner(combinerClassName, null);
+    }
+
+    @Override
+    public SpecificBuilder<E> setCombiner(String combinerClassName, Map<String, String> combinerConf) {
+      builder.setCombiner(combinerClassName, combinerConf);
+      return this;
+    }
+
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
+      builder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
+      builder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setFromConfiguration(Configuration conf) {
+      builder.setFromConfiguration(conf);
+      return this;
+    }
+
+    public E done() {
+      return edgeBuilder;
+    }
+
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  Configuration conf;
+
+  private String inputClassName;
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  OrderedGroupedKVInputConfig() {
+  }
+
+  private OrderedGroupedKVInputConfig(Configuration conf, boolean useLegacyInput) {
+    this.conf = conf;
+    if (useLegacyInput) {
+      inputClassName = OrderedGroupedInputLegacy.class.getName();
+    } else {
+      inputClassName = OrderedGroupedKVInput.class.getName();
+    }
+  }
+
+  /**
+   * Get a UserPayload representation of the Configuration
+   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
+   */
+  public UserPayload toUserPayload() {
+    try {
+      return TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public void fromUserPayload(UserPayload payload) {
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(payload);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public String getInputClassName() {
+    return inputClassName;
+  }
+
+  public static Builder newBuilder(String keyClass, String valueClass) {
+    return new Builder(keyClass, valueClass);
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder implements SpecificConfigBuilder<Builder> {
+
+    private final Configuration conf = new Configuration(false);
+    private boolean useLegacyInput = false;
+
+    /**
+     * Create a configuration builder for {@link org.apache.tez.runtime.library.input.OrderedGroupedKVInput}
+     *
+     * @param keyClassName         the key class name
+     * @param valueClassName       the value class name
+     */
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName) {
+      this();
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      setKeyClassName(keyClassName);
+      setValueClassName(valueClassName);
+    }
+
+    @InterfaceAudience.Private
+    Builder() {
+      Map<String, String> tezDefaults = ConfigUtils
+          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
+              OrderedGroupedKVInput.getConfigurationKeySet());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
+      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
+    }
+
+    @InterfaceAudience.Private
+    Builder setKeyClassName(String keyClassName) {
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setValueClassName(String valueClassName) {
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
+      return this;
+    }
+
+    public Builder useLegacyInput() {
+      this.useLegacyInput = true;
+      return this;
+    }
+
+    @Override
+    public Builder setShuffleBufferFraction(float shuffleBufferFraction) {
+      this.conf
+          .setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, shuffleBufferFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setPostMergeBufferFraction(float postMergeBufferFraction) {
+      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, postMergeBufferFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
+      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+          maxSingleSegmentFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setMemToMemMerger(boolean enable) {
+      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, enable);
+      return this;
+    }
+
+    @Override
+    public Builder setMergeFraction(float mergeFraction) {
+      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, mergeFraction);
+      return this;
+    }
+
+    public Builder setCombiner(String combinerClassName) {
+      return setCombiner(combinerClassName, null);
+    }
+
+    @Override
+    public Builder setCombiner(String combinerClassName, Map<String, String> combinerConf) {
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, combinerClassName);
+      if (combinerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, combinerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Set the key comparator class
+     *
+     * @param comparatorClassName the key comparator class name
+     * @return instance of the current builder
+     */
+    public Builder setKeyComparatorClass(String comparatorClassName) {
+      return this.setKeyComparatorClass(comparatorClassName, null);
+    }
+
+    /**
+     * Set the key comparator class and it's associated configuration. This method should only be
+     * used if the comparator requires some specific configuration, which is typically not the
+     * case. {@link #setKeyComparatorClass(String)} is the preferred method for setting a
+     * comparator.
+     *
+     * @param comparatorClassName the key comparator class name
+     * @param comparatorConf      the comparator configuration. This can be null, and is a {@link
+     *                            java.util.Map} of key-value pairs. The keys should be limited to
+     *                            the ones required by the comparator.
+     * @return instance of the current builder
+     */
+    public Builder setKeyComparatorClass(String comparatorClassName,
+                                         @Nullable Map<String, String> comparatorConf) {
+      Preconditions.checkNotNull(comparatorClassName, "Comparator class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+          comparatorClassName);
+      if (comparatorConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, comparatorConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      Preconditions.checkNotNull(key, "Key cannot be null");
+      if (ConfigUtils.doesKeyQualify(key,
+          Lists.newArrayList(OrderedGroupedKVInput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
+          TezRuntimeConfiguration.getAllowedPrefixes())) {
+        if (value == null) {
+          this.conf.unset(key);
+        } else {
+          this.conf.set(key, value);
+        }
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
+          Lists.newArrayList(OrderedGroupedKVInput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
+      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
+          Lists.newArrayList(OrderedGroupedKVInput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
+      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
+      if (enabled && compressionCodec != null) {
+        this.conf
+            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
+      }
+      if (codecConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Set serialization class and the relevant comparator to be used for sorting.
+     * Providing custom serialization class could change the way, keys needs to be compared in
+     * sorting. Providing invalid comparator here could create invalid results.
+     *
+     * @param serializationClassName
+     * @param comparatorClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName,
+        String comparatorClassName, @Nullable Map<String, String> serializerConf) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      Preconditions.checkArgument(comparatorClassName != null,
+          "comparator cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      setKeyComparatorClass(comparatorClassName, null);
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Serialization class to be used for serializing values.
+     *
+     * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Create the actual configuration instance.
+     *
+     * @return an instance of the Configuration
+     */
+    public OrderedGroupedKVInputConfig build() {
+      return new OrderedGroupedKVInputConfig(this.conf, this.useLegacyInput);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
deleted file mode 100644
index cc6e44b..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
+++ /dev/null
@@ -1,515 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Map;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
-import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-/**
- * Configure {@link org.apache.tez.runtime.library.input.OrderedGroupedKVInput} </p>
- *
- * Values will be picked up from tez-site if not specified, otherwise defaults from
- * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
- */
-public class OrderedGroupedKVInputConfigurer {
-
-  /**
-   * Configure parameters which are specific to the Input.
-   */
-  @InterfaceAudience.Private
-  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
-
-    /**
-     * Specifies whether the legacy version of this input should be used.
-     * @return instance of the current builder
-     */
-    public T useLegacyInput();
-    /**
-     * Sets the buffer fraction, as a fraction of container size, to be used while fetching remote
-     * data.
-     *
-     * @param shuffleBufferFraction fraction of container size
-     * @return instance of the current builder
-     */
-    public T setShuffleBufferFraction(float shuffleBufferFraction);
-
-    /**
-     * Sets the buffer fraction, as a fraction of container size, to be used after the fetch and
-     * merge are complete. This buffer is used to cache merged data and avoids writing it out to
-     * disk.
-     *
-     * @param postMergeBufferFraction fraction of container size
-     * @return instance of the current builder
-     */
-    public T setPostMergeBufferFraction(float postMergeBufferFraction);
-
-    /**
-     * Sets a size limit on the maximum segment size to be shuffled to disk. This is a fraction of
-     * the shuffle buffer.
-     *
-     * @param maxSingleSegmentFraction fraction of memory determined by ShuffleBufferFraction
-     * @return instance of the current builder
-     */
-    public T setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction);
-
-    /**
-     * Enable the memory to memory merger
-     *
-     * @param enable whether to enable the memory to memory merger
-     * @return
-     */
-    public T setMemToMemMerger(boolean enable); // Not super useful until additional params are used.
-
-    /**
-     * Configure the point at which in memory segments will be merged. This is specified as a
-     * fraction of the shuffle buffer.
-     *
-     * @param mergeFraction fraction of memory determined by ShuffleBufferFraction, which when
-     *                      filled, will
-     *                      trigger a merge
-     * @return instance of the current builder
-     */
-    public T setMergeFraction(float mergeFraction);
-
-    /**
-     * Configure the combiner class
-     *
-     * @param combinerClassName the combiner class name
-     * @return instance of the current builder
-     */
-    public T setCombiner(String combinerClassName);
-
-    /**
-     * Configure the combiner class and it's associated configuration (specified as key-value
-     * pairs). This method should only be used if the combiner requires some specific configuration.
-     * {@link #setCombiner(String)} is the preferred method for setting a combiner.
-     *
-     * @param combinerClassName the combiner class name
-     * @param combinerConf      the combiner configuration. This can be null, and otherwise
-     *                          is a {@link java.util.Map} of key-value pairs. The keys should
-     *                          be limited to the ones required by the combiner.
-     * @return instance of the current builder
-     */
-    public T setCombiner(String combinerClassName, @Nullable Map<String, String> combinerConf);
-
-  }
-
-  @SuppressWarnings("rawtypes")
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
-      SpecificConfigurer<SpecificBuilder> {
-
-    private final E edgeBuilder;
-    private final OrderedGroupedKVInputConfigurer.Builder builder;
-
-
-    @InterfaceAudience.Private
-    SpecificBuilder(E edgeBuilder, OrderedGroupedKVInputConfigurer.Builder builder) {
-      this.edgeBuilder = edgeBuilder;
-      this.builder = builder;
-    }
-
-    public SpecificBuilder<E> useLegacyInput() {
-      builder.useLegacyInput();
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setShuffleBufferFraction(float shuffleBufferFraction) {
-      builder.setShuffleBufferFraction(shuffleBufferFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setPostMergeBufferFraction(float postMergeBufferFraction) {
-      builder.setPostMergeBufferFraction(postMergeBufferFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
-      builder.setMaxSingleMemorySegmentFraction(maxSingleSegmentFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setMemToMemMerger(boolean enable) {
-      builder.setMemToMemMerger(enable);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setMergeFraction(float mergeFraction) {
-      builder.setMergeFraction(mergeFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setCombiner(String combinerClassName) {
-      return setCombiner(combinerClassName, null);
-    }
-
-    @Override
-    public SpecificBuilder<E> setCombiner(String combinerClassName, Map<String, String> combinerConf) {
-      builder.setCombiner(combinerClassName, combinerConf);
-      return this;
-    }
-
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
-      builder.setAdditionalConfiguration(key, value);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
-      builder.setAdditionalConfiguration(confMap);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setFromConfiguration(Configuration conf) {
-      builder.setFromConfiguration(conf);
-      return this;
-    }
-
-    public E done() {
-      return edgeBuilder;
-    }
-
-  }
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  Configuration conf;
-
-  private String inputClassName;
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  OrderedGroupedKVInputConfigurer() {
-  }
-
-  private OrderedGroupedKVInputConfigurer(Configuration conf, boolean useLegacyInput) {
-    this.conf = conf;
-    if (useLegacyInput) {
-      inputClassName = OrderedGroupedInputLegacy.class.getName();
-    } else {
-      inputClassName = OrderedGroupedKVInput.class.getName();
-    }
-  }
-
-  /**
-   * Get a UserPayload representation of the Configuration
-   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
-   */
-  public UserPayload toUserPayload() {
-    try {
-      return TezUtils.createUserPayloadFromConf(conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @InterfaceAudience.Private
-  public void fromUserPayload(UserPayload payload) {
-    try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public String getInputClassName() {
-    return inputClassName;
-  }
-
-  public static Builder newBuilder(String keyClass, String valueClass) {
-    return new Builder(keyClass, valueClass);
-  }
-
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class Builder implements SpecificConfigurer<Builder> {
-
-    private final Configuration conf = new Configuration(false);
-    private boolean useLegacyInput = false;
-
-    /**
-     * Create a configuration builder for {@link org.apache.tez.runtime.library.input.OrderedGroupedKVInput}
-     *
-     * @param keyClassName         the key class name
-     * @param valueClassName       the value class name
-     */
-    @InterfaceAudience.Private
-    Builder(String keyClassName, String valueClassName) {
-      this();
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      setKeyClassName(keyClassName);
-      setValueClassName(valueClassName);
-    }
-
-    @InterfaceAudience.Private
-    Builder() {
-      Map<String, String> tezDefaults = ConfigUtils
-          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
-              OrderedGroupedKVInput.getConfigurationKeySet());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
-      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
-    }
-
-    @InterfaceAudience.Private
-    Builder setKeyClassName(String keyClassName) {
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
-      return this;
-    }
-
-    @InterfaceAudience.Private
-    Builder setValueClassName(String valueClassName) {
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
-      return this;
-    }
-
-    public Builder useLegacyInput() {
-      this.useLegacyInput = true;
-      return this;
-    }
-
-    @Override
-    public Builder setShuffleBufferFraction(float shuffleBufferFraction) {
-      this.conf
-          .setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, shuffleBufferFraction);
-      return this;
-    }
-
-    @Override
-    public Builder setPostMergeBufferFraction(float postMergeBufferFraction) {
-      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, postMergeBufferFraction);
-      return this;
-    }
-
-    @Override
-    public Builder setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
-      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
-          maxSingleSegmentFraction);
-      return this;
-    }
-
-    @Override
-    public Builder setMemToMemMerger(boolean enable) {
-      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, enable);
-      return this;
-    }
-
-    @Override
-    public Builder setMergeFraction(float mergeFraction) {
-      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, mergeFraction);
-      return this;
-    }
-
-    public Builder setCombiner(String combinerClassName) {
-      return setCombiner(combinerClassName, null);
-    }
-
-    @Override
-    public Builder setCombiner(String combinerClassName, Map<String, String> combinerConf) {
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, combinerClassName);
-      if (combinerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, combinerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    /**
-     * Set the key comparator class
-     *
-     * @param comparatorClassName the key comparator class name
-     * @return instance of the current builder
-     */
-    public Builder setKeyComparatorClass(String comparatorClassName) {
-      return this.setKeyComparatorClass(comparatorClassName, null);
-    }
-
-    /**
-     * Set the key comparator class and it's associated configuration. This method should only be
-     * used if the comparator requires some specific configuration, which is typically not the
-     * case. {@link #setKeyComparatorClass(String)} is the preferred method for setting a
-     * comparator.
-     *
-     * @param comparatorClassName the key comparator class name
-     * @param comparatorConf      the comparator configuration. This can be null, and is a {@link
-     *                            java.util.Map} of key-value pairs. The keys should be limited to
-     *                            the ones required by the comparator.
-     * @return instance of the current builder
-     */
-    public Builder setKeyComparatorClass(String comparatorClassName,
-                                         @Nullable Map<String, String> comparatorConf) {
-      Preconditions.checkNotNull(comparatorClassName, "Comparator class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
-          comparatorClassName);
-      if (comparatorConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, comparatorConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-    @Override
-    public Builder setAdditionalConfiguration(String key, String value) {
-      Preconditions.checkNotNull(key, "Key cannot be null");
-      if (ConfigUtils.doesKeyQualify(key,
-          Lists.newArrayList(OrderedGroupedKVInput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
-          TezRuntimeConfiguration.getAllowedPrefixes())) {
-        if (value == null) {
-          this.conf.unset(key);
-        } else {
-          this.conf.set(key, value);
-        }
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
-      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
-          Lists.newArrayList(OrderedGroupedKVInput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    @Override
-    public Builder setFromConfiguration(Configuration conf) {
-      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
-      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
-          Lists.newArrayList(OrderedGroupedKVInput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
-                                  @Nullable Map<String, String> codecConf) {
-      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
-      if (enabled && compressionCodec != null) {
-        this.conf
-            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
-      }
-      if (codecConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    /**
-     * Set serialization class and the relevant comparator to be used for sorting.
-     * Providing custom serialization class could change the way, keys needs to be compared in
-     * sorting. Providing invalid comparator here could create invalid results.
-     *
-     * @param serializationClassName
-     * @param comparatorClassName
-     * @param serializerConf         the serializer configuration. This can be null, and is a
-     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
-     *                               to the ones required by the comparator.
-     * @return
-     */
-    public Builder setKeySerializationClass(String serializationClassName,
-        String comparatorClassName, @Nullable Map<String, String> serializerConf) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      Preconditions.checkArgument(comparatorClassName != null,
-          "comparator cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      setKeyComparatorClass(comparatorClassName, null);
-      if (serializerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    /**
-     * Serialization class to be used for serializing values.
-     *
-     * @param serializationClassName
-     * @param serializerConf         the serializer configuration. This can be null, and is a
-     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
-     *                               to the ones required by the comparator.
-     * @return
-     */
-    public Builder setValueSerializationClass(String serializationClassName,
-                                              @Nullable Map<String, String> serializerConf) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      if (serializerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    /**
-     * Create the actual configuration instance.
-     *
-     * @return an instance of the Configuration
-     */
-    public OrderedGroupedKVInputConfigurer build() {
-      return new OrderedGroupedKVInputConfigurer(this.conf, this.useLegacyInput);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
new file mode 100644
index 0000000..090b12f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+
+/**
+ * Configure payloads for the OrderedPartitionedKVOutput and OrderedGroupedKVInput pair </p>
+ *
+ * Values will be picked up from tez-site if not specified, otherwise defaults from
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class OrderedPartitionedKVEdgeConfig
+    extends HadoopKeyValuesBasedBaseEdgeConfig {
+
+  private final OrderedPartitionedKVOutputConfig outputConf;
+  private final OrderedGroupedKVInputConfig inputConf;
+
+  private OrderedPartitionedKVEdgeConfig(
+      OrderedPartitionedKVOutputConfig outputConfiguration,
+      OrderedGroupedKVInputConfig inputConfiguration) {
+    this.outputConf = outputConfiguration;
+    this.inputConf = inputConfiguration;
+  }
+
+  /**
+   * Create a builder to configure the relevant Input and Output. </p> This method should only be
+   * used when using a custom Partitioner which requires specific Configuration. {@link
+   * #newBuilder(String, String, String)} is the preferred method to crate an instance of the
+   * Builder
+   *
+   * @param keyClassName         the key class name
+   * @param valueClassName       the value class name
+   * @param partitionerClassName the partitioner class name
+   * @param partitionerConf      the partitioner configuration. This can be null, and is a {@link
+   *                             java.util.Map} of key-value pairs. The keys should be limited to
+   *                             the ones required by the partitioner.
+   * @return a builder to configure the edge
+   */
+  public static Builder newBuilder(String keyClassName, String valueClassName,
+                                   String partitionerClassName,
+                                   @Nullable Map<String, String> partitionerConf) {
+    return new Builder(keyClassName, valueClassName, partitionerClassName, partitionerConf);
+  }
+
+  /**
+   * Create a builder to configure the relevant Input and Output
+   *
+   * @param keyClassName         the key class name
+   * @param valueClassName       the value class name
+   * @param partitionerClassName the partitioner class name
+   * @return a builder to configure the edge
+   */
+  public static Builder newBuilder(String keyClassName, String valueClassName,
+                                   String partitionerClassName) {
+    return newBuilder(keyClassName, valueClassName, partitionerClassName, null);
+  }
+
+  @Override
+  public UserPayload getOutputPayload() {
+    return outputConf.toUserPayload();
+  }
+
+  @Override
+  public String getOutputClassName() {
+    return OrderedPartitionedKVOutput.class.getName();
+  }
+
+  @Override
+  public UserPayload getInputPayload() {
+    return inputConf.toUserPayload();
+  }
+
+  @Override
+  public String getInputClassName() {
+    return inputConf.getInputClassName();
+  }
+
+  /**
+   * This is a convenience method for the typical usage of this edge, and creates an instance of
+   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
+   * * In this case - DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED,
+   * EdgeProperty.SchedulingType.SEQUENTIAL
+   *
+   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
+   */
+  public EdgeProperty createDefaultEdgeProperty() {
+    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER,
+        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
+        new OutputDescriptor(
+            getOutputClassName()).setUserPayload(getOutputPayload()),
+        new InputDescriptor(
+            getInputClassName()).setUserPayload(getInputPayload()));
+    return edgeProperty;
+  }
+
+  /**
+   * This is a convenience method for creating an Edge descriptor based on the specified
+   * EdgeManagerDescriptor.
+   *
+   * @param edgeManagerDescriptor the custom edge specification
+   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
+   */
+  public EdgeProperty createDefaultCustomEdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor) {
+    Preconditions.checkNotNull(edgeManagerDescriptor, "EdgeManagerDescriptor cannot be null");
+    EdgeProperty edgeProperty =
+        new EdgeProperty(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
+            EdgeProperty.SchedulingType.SEQUENTIAL,
+            new OutputDescriptor(getOutputClassName()).setUserPayload(getOutputPayload()),
+            new InputDescriptor(getInputClassName()).setUserPayload(getInputPayload()));
+    return edgeProperty;
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfig.Builder<Builder> {
+
+    private final OrderedPartitionedKVOutputConfig.Builder outputBuilder =
+        new OrderedPartitionedKVOutputConfig.Builder();
+    private final OrderedPartitionedKVOutputConfig.SpecificBuilder<OrderedPartitionedKVEdgeConfig.Builder>
+        specificOutputBuilder =
+        new OrderedPartitionedKVOutputConfig.SpecificBuilder<OrderedPartitionedKVEdgeConfig.Builder>(
+            this, outputBuilder);
+
+    private final OrderedGroupedKVInputConfig.Builder inputBuilder =
+        new OrderedGroupedKVInputConfig.Builder();
+    private final OrderedGroupedKVInputConfig.SpecificBuilder<OrderedPartitionedKVEdgeConfig.Builder>
+        specificInputBuilder =
+        new OrderedGroupedKVInputConfig.SpecificBuilder<OrderedPartitionedKVEdgeConfig.Builder>(this,
+            inputBuilder);
+
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName, String partitionerClassName,
+            Map<String, String> partitionerConf) {
+      outputBuilder.setKeyClassName(keyClassName);
+      outputBuilder.setValueClassName(valueClassName);
+      outputBuilder.setPartitioner(partitionerClassName, partitionerConf);
+      inputBuilder.setKeyClassName(keyClassName);
+      inputBuilder.setValueClassName(valueClassName);
+    }
+
+    /**
+     * Set the key comparator class
+     *
+     * @param comparatorClassName the key comparator class name
+     * @return instance of the current builder
+     */
+    public Builder setKeyComparatorClass(String comparatorClassName) {
+      return setKeyComparatorClass(comparatorClassName, null);
+    }
+
+    /**
+     * Set the key comparator class and it's associated configuration. This method should only be
+     * used if the comparator requires some specific configuration, which is typically not the
+     * case. {@link #setKeyComparatorClass(String)} is the preferred method for setting a
+     * comparator.
+     *
+     * @param comparatorClassName the key comparator class name
+     * @param comparatorConf      the comparator configuration. This can be null, and is a {@link
+     *                            java.util.Map} of key-value pairs. The keys should be limited to
+     *                            the ones required by the comparator.
+     * @return instance of the current builder
+     */
+    public Builder setKeyComparatorClass(String comparatorClassName,
+                                         @Nullable Map<String, String> comparatorConf) {
+      outputBuilder.setKeyComparatorClass(comparatorClassName, comparatorConf);
+      inputBuilder.setKeyComparatorClass(comparatorClassName, comparatorConf);
+      return this;
+    }
+
+    /**
+     * Set serialization class and the relevant comparator to be used for sorting.
+     * Providing custom serialization class could change the way, keys needs to be compared in
+     * sorting. Providing invalid comparator here could create invalid results.
+     *
+     * @param serializationClassName
+     * @param comparatorClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName,
+        String comparatorClassName, @Nullable Map<String, String> serializerConf) {
+      outputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName, serializerConf);
+      inputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName, serializerConf);
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
+      outputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
+      inputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
+      return this;
+    }
+
+
+    @Override
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
+      outputBuilder.setCompression(enabled, compressionCodec, codecConf);
+      inputBuilder.setCompression(enabled, compressionCodec, codecConf);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      outputBuilder.setAdditionalConfiguration(key, value);
+      inputBuilder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      outputBuilder.setAdditionalConfiguration(confMap);
+      inputBuilder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      outputBuilder.setFromConfiguration(conf);
+      inputBuilder.setFromConfiguration(conf);
+      return this;
+    }
+
+    /**
+     * Configure the specific output
+     * @return a builder to configure the output
+     */
+    public OrderedPartitionedKVOutputConfig.SpecificBuilder<Builder> configureOutput() {
+      return specificOutputBuilder;
+    }
+
+    /**
+     * Configure the specific input
+     * @return a builder to configure the input
+     */
+    public OrderedGroupedKVInputConfig.SpecificBuilder<Builder> configureInput() {
+      return specificInputBuilder;
+    }
+
+    /**
+     * Build and return an instance of the configuration
+     * @return an instance of the acatual configuration
+     */
+    public OrderedPartitionedKVEdgeConfig build() {
+      return new OrderedPartitionedKVEdgeConfig(outputBuilder.build(), inputBuilder.build());
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
deleted file mode 100644
index 3abaf47..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import javax.annotation.Nullable;
-
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
-
-/**
- * Configure payloads for the OrderedPartitionedKVOutput and OrderedGroupedKVInput pair </p>
- *
- * Values will be picked up from tez-site if not specified, otherwise defaults from
- * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfigurer {
-
-  private final OrderedPartitionedKVOutputConfigurer outputConf;
-  private final OrderedGroupedKVInputConfigurer inputConf;
-
-  private OrderedPartitionedKVEdgeConfigurer(
-      OrderedPartitionedKVOutputConfigurer outputConfiguration,
-      OrderedGroupedKVInputConfigurer inputConfiguration) {
-    this.outputConf = outputConfiguration;
-    this.inputConf = inputConfiguration;
-  }
-
-  /**
-   * Create a builder to configure the relevant Input and Output. </p> This method should only be
-   * used when using a custom Partitioner which requires specific Configuration. {@link
-   * #newBuilder(String, String, String)} is the preferred method to crate an instance of the
-   * Builder
-   *
-   * @param keyClassName         the key class name
-   * @param valueClassName       the value class name
-   * @param partitionerClassName the partitioner class name
-   * @param partitionerConf      the partitioner configuration. This can be null, and is a {@link
-   *                             java.util.Map} of key-value pairs. The keys should be limited to
-   *                             the ones required by the partitioner.
-   * @return a builder to configure the edge
-   */
-  public static Builder newBuilder(String keyClassName, String valueClassName,
-                                   String partitionerClassName,
-                                   @Nullable Map<String, String> partitionerConf) {
-    return new Builder(keyClassName, valueClassName, partitionerClassName, partitionerConf);
-  }
-
-  /**
-   * Create a builder to configure the relevant Input and Output
-   *
-   * @param keyClassName         the key class name
-   * @param valueClassName       the value class name
-   * @param partitionerClassName the partitioner class name
-   * @return a builder to configure the edge
-   */
-  public static Builder newBuilder(String keyClassName, String valueClassName,
-                                   String partitionerClassName) {
-    return newBuilder(keyClassName, valueClassName, partitionerClassName, null);
-  }
-
-  @Override
-  public UserPayload getOutputPayload() {
-    return outputConf.toUserPayload();
-  }
-
-  @Override
-  public String getOutputClassName() {
-    return OrderedPartitionedKVOutput.class.getName();
-  }
-
-  @Override
-  public UserPayload getInputPayload() {
-    return inputConf.toUserPayload();
-  }
-
-  @Override
-  public String getInputClassName() {
-    return inputConf.getInputClassName();
-  }
-
-  /**
-   * This is a convenience method for the typical usage of this edge, and creates an instance of
-   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
-   * * In this case - DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED,
-   * EdgeProperty.SchedulingType.SEQUENTIAL
-   *
-   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
-   */
-  public EdgeProperty createDefaultEdgeProperty() {
-    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER,
-        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
-        new OutputDescriptor(
-            getOutputClassName()).setUserPayload(getOutputPayload()),
-        new InputDescriptor(
-            getInputClassName()).setUserPayload(getInputPayload()));
-    return edgeProperty;
-  }
-
-  /**
-   * This is a convenience method for creating an Edge descriptor based on the specified
-   * EdgeManagerDescriptor.
-   *
-   * @param edgeManagerDescriptor the custom edge specification
-   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
-   */
-  public EdgeProperty createDefaultCustomEdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor) {
-    Preconditions.checkNotNull(edgeManagerDescriptor, "EdgeManagerDescriptor cannot be null");
-    EdgeProperty edgeProperty =
-        new EdgeProperty(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
-            EdgeProperty.SchedulingType.SEQUENTIAL,
-            new OutputDescriptor(getOutputClassName()).setUserPayload(getOutputPayload()),
-            new InputDescriptor(getInputClassName()).setUserPayload(getInputPayload()));
-    return edgeProperty;
-  }
-
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder<Builder> {
-
-    private final OrderedPartitionedKVOutputConfigurer.Builder outputBuilder =
-        new OrderedPartitionedKVOutputConfigurer.Builder();
-    private final OrderedPartitionedKVOutputConfigurer.SpecificBuilder<OrderedPartitionedKVEdgeConfigurer.Builder>
-        specificOutputBuilder =
-        new OrderedPartitionedKVOutputConfigurer.SpecificBuilder<OrderedPartitionedKVEdgeConfigurer.Builder>(
-            this, outputBuilder);
-
-    private final OrderedGroupedKVInputConfigurer.Builder inputBuilder =
-        new OrderedGroupedKVInputConfigurer.Builder();
-    private final OrderedGroupedKVInputConfigurer.SpecificBuilder<OrderedPartitionedKVEdgeConfigurer.Builder>
-        specificInputBuilder =
-        new OrderedGroupedKVInputConfigurer.SpecificBuilder<OrderedPartitionedKVEdgeConfigurer.Builder>(this,
-            inputBuilder);
-
-    @InterfaceAudience.Private
-    Builder(String keyClassName, String valueClassName, String partitionerClassName,
-            Map<String, String> partitionerConf) {
-      outputBuilder.setKeyClassName(keyClassName);
-      outputBuilder.setValueClassName(valueClassName);
-      outputBuilder.setPartitioner(partitionerClassName, partitionerConf);
-      inputBuilder.setKeyClassName(keyClassName);
-      inputBuilder.setValueClassName(valueClassName);
-    }
-
-    /**
-     * Set the key comparator class
-     *
-     * @param comparatorClassName the key comparator class name
-     * @return instance of the current builder
-     */
-    public Builder setKeyComparatorClass(String comparatorClassName) {
-      return setKeyComparatorClass(comparatorClassName, null);
-    }
-
-    /**
-     * Set the key comparator class and it's associated configuration. This method should only be
-     * used if the comparator requires some specific configuration, which is typically not the
-     * case. {@link #setKeyComparatorClass(String)} is the preferred method for setting a
-     * comparator.
-     *
-     * @param comparatorClassName the key comparator class name
-     * @param comparatorConf      the comparator configuration. This can be null, and is a {@link
-     *                            java.util.Map} of key-value pairs. The keys should be limited to
-     *                            the ones required by the comparator.
-     * @return instance of the current builder
-     */
-    public Builder setKeyComparatorClass(String comparatorClassName,
-                                         @Nullable Map<String, String> comparatorConf) {
-      outputBuilder.setKeyComparatorClass(comparatorClassName, comparatorConf);
-      inputBuilder.setKeyComparatorClass(comparatorClassName, comparatorConf);
-      return this;
-    }
-
-    /**
-     * Set serialization class and the relevant comparator to be used for sorting.
-     * Providing custom serialization class could change the way, keys needs to be compared in
-     * sorting. Providing invalid comparator here could create invalid results.
-     *
-     * @param serializationClassName
-     * @param comparatorClassName
-     * @param serializerConf         the serializer configuration. This can be null, and is a
-     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
-     *                               to the ones required by the comparator.
-     * @return
-     */
-    public Builder setKeySerializationClass(String serializationClassName,
-        String comparatorClassName, @Nullable Map<String, String> serializerConf) {
-      outputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName, serializerConf);
-      inputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName, serializerConf);
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for values.
-     *
-     * @param serializationClassName
-     * @param serializerConf         the serializer configuration. This can be null, and is a
-     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
-     *                               to the ones required by the comparator.
-     * @return
-     */
-    public Builder setValueSerializationClass(String serializationClassName,
-                                              @Nullable Map<String, String> serializerConf) {
-      outputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
-      inputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
-      return this;
-    }
-
-
-    @Override
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
-                                  @Nullable Map<String, String> codecConf) {
-      outputBuilder.setCompression(enabled, compressionCodec, codecConf);
-      inputBuilder.setCompression(enabled, compressionCodec, codecConf);
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(String key, String value) {
-      outputBuilder.setAdditionalConfiguration(key, value);
-      inputBuilder.setAdditionalConfiguration(key, value);
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
-      outputBuilder.setAdditionalConfiguration(confMap);
-      inputBuilder.setAdditionalConfiguration(confMap);
-      return this;
-    }
-
-    @Override
-    public Builder setFromConfiguration(Configuration conf) {
-      outputBuilder.setFromConfiguration(conf);
-      inputBuilder.setFromConfiguration(conf);
-      return this;
-    }
-
-    /**
-     * Configure the specific output
-     * @return a builder to configure the output
-     */
-    public OrderedPartitionedKVOutputConfigurer.SpecificBuilder<Builder> configureOutput() {
-      return specificOutputBuilder;
-    }
-
-    /**
-     * Configure the specific input
-     * @return a builder to configure the input
-     */
-    public OrderedGroupedKVInputConfigurer.SpecificBuilder<Builder> configureInput() {
-      return specificInputBuilder;
-    }
-
-    /**
-     * Build and return an instance of the configuration
-     * @return an instance of the acatual configuration
-     */
-    public OrderedPartitionedKVEdgeConfigurer build() {
-      return new OrderedPartitionedKVEdgeConfigurer(outputBuilder.build(), inputBuilder.build());
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/b526ed5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
new file mode 100644
index 0000000..f9b468c
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
@@ -0,0 +1,441 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+/**
+ * Configure {@link org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput} </p>
+ * 
+ * Values will be picked up from tez-site if not specified, otherwise defaults from
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
+ */
+public class OrderedPartitionedKVOutputConfig {
+
+  /**
+   * Configure parameters which are specific to the Output.
+   */
+  @InterfaceAudience.Private
+  public static interface SpecificConfigBuilder<T> extends BaseConfigBuilder<T> {
+    /**
+     * Set the buffer size to use when sort the output
+     *
+     * @param sortBufferSize the size of the buffer in MB
+     * @return instance of the current builder
+     */
+    public T setSortBufferSize(int sortBufferSize);
+
+
+    /**
+     * Configure the combiner class
+     *
+     * @param combinerClassName the combiner class name
+     * @return instance of the current builder
+     */
+    public T setCombiner(String combinerClassName);
+
+    /**
+     * Configure the combiner class and it's associated configuration (specified as key-value
+     * pairs). This method should only be used if the combiner requires some specific configuration.
+     * {@link #setCombiner(String)} is the preferred method for setting a combiner.
+     *
+     * @param combinerClassName the combiner class name
+     * @param combinerConf      the combiner configuration. This can be null, and otherwise
+     *                          is a {@link java.util.Map} of key-value pairs. The keys should
+     *                          be limited to the ones required by the combiner.
+     * @return instance of the current builder
+     */
+    public T setCombiner(String combinerClassName, @Nullable Map<String, String> combinerConf);
+
+
+
+    /**
+     * Configure the number of threads to be used by the sorter
+     *
+     * @param numThreads the number of threads
+     * @return instance of the current builder
+     */
+    public T setSorterNumThreads(int numThreads);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfig.Builder> implements
+      SpecificConfigBuilder<SpecificBuilder> {
+
+    private final E edgeBuilder;
+    private final Builder builder;
+
+    SpecificBuilder(E edgeBuilder, Builder builder) {
+      this.edgeBuilder = edgeBuilder;
+      this.builder = builder;
+    }
+
+    @Override
+    public SpecificBuilder<E> setSortBufferSize(int sortBufferSize) {
+      builder.setSortBufferSize(sortBufferSize);
+      return this;
+    }
+
+    public SpecificBuilder<E> setCombiner(String combinerClassName) {
+      return this.setCombiner(combinerClassName, null);
+    }
+
+    @Override
+    public SpecificBuilder<E> setCombiner(String combinerClassName, Map<String, String> combinerConf) {
+      builder.setCombiner(combinerClassName, combinerConf);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setSorterNumThreads(int numThreads) {
+      builder.setSorterNumThreads(numThreads);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
+      builder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
+      builder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setFromConfiguration(Configuration conf) {
+      builder.setFromConfiguration(conf);
+      return this;
+    }
+
+    public E done() {
+      return edgeBuilder;
+    }
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  Configuration conf;
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  OrderedPartitionedKVOutputConfig() {
+  }
+
+  private OrderedPartitionedKVOutputConfig(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get a UserPayload representation of the Configuration
+   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
+   */
+  public UserPayload toUserPayload() {
+    try {
+      return TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public void fromUserPayload(UserPayload payload) {
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(payload);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Builder newBuilder(String keyClass, String valueClass, String partitionerClassName) {
+    return newBuilder(keyClass, valueClass, partitionerClassName, null);
+  }
+
+  public static Builder newBuilder(String keyClass, String valueClass, String partitionerClassName,
+                                   Map<String, String> partitionerConf) {
+    return new Builder(keyClass, valueClass, partitionerClassName, partitionerConf);
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder implements SpecificConfigBuilder<Builder> {
+
+    private final Configuration conf = new Configuration(false);
+
+    /**
+     * Create a configuration builder for {@link org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput}
+     *
+     * @param keyClassName         the key class name
+     * @param valueClassName       the value class name
+     * @param partitionerClassName the partitioner class name
+     * @param partitionerConf      the partitioner configuration. This can be null, and is a {@link
+     *                             java.util.Map} of key-value pairs. The keys should be limited to
+     *                             the ones required by the partitioner.
+     */
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName, String partitionerClassName,
+                   @Nullable Map<String, String> partitionerConf) {
+      this();
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      Preconditions.checkNotNull(partitionerClassName, "Partitioner class name cannot be null");
+      setKeyClassName(keyClassName);
+      setValueClassName(valueClassName);
+      setPartitioner(partitionerClassName, partitionerConf);
+    }
+
+    @InterfaceAudience.Private
+    Builder() {
+      Map<String, String> tezDefaults = ConfigUtils
+          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
+              OrderedPartitionedKVOutput.getConfigurationKeySet());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
+      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
+    }
+
+    @InterfaceAudience.Private
+    Builder setKeyClassName(String keyClassName) {
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setValueClassName(String valueClassName) {
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setPartitioner(String partitionerClassName, @Nullable Map<String, String> partitionerConf) {
+      Preconditions.checkNotNull(partitionerClassName, "Partitioner class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, partitionerClassName);
+      if (partitionerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, partitionerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setSortBufferSize(int sortBufferSize) {
+      this.conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, sortBufferSize);
+      return this;
+    }
+
+    @Override
+    public Builder setCombiner(String combinerClassName) {
+      return this.setCombiner(combinerClassName, null);
+    }
+
+    @Override
+    public Builder setCombiner(String combinerClassName, Map<String, String> combinerConf) {
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, combinerClassName);
+      if (combinerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, combinerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setSorterNumThreads(int numThreads) {
+      this.conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, numThreads);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      Preconditions.checkNotNull(key, "Key cannot be null");
+      if (ConfigUtils.doesKeyQualify(key,
+          Lists.newArrayList(OrderedPartitionedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
+          TezRuntimeConfiguration.getAllowedPrefixes())) {
+        if (value == null) {
+          this.conf.unset(key);
+        } else {
+          this.conf.set(key, value);
+        }
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
+          Lists.newArrayList(OrderedPartitionedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
+      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
+          Lists.newArrayList(OrderedPartitionedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    /**
+     * Set the key comparator class
+     *
+     * @param comparatorClassName the key comparator class name
+     * @return instance of the current builder
+     */
+    public Builder setKeyComparatorClass(String comparatorClassName) {
+      return this.setKeyComparatorClass(comparatorClassName, null);
+    }
+
+    /**
+     * Set the key comparator class and it's associated configuration. This method should only be
+     * used if the comparator requires some specific configuration, which is typically not the
+     * case. {@link #setKeyComparatorClass(String)} is the preferred method for setting a
+     * comparator.
+     *
+     * @param comparatorClassName the key comparator class name
+     * @param comparatorConf      the comparator configuration. This can be null, and is a {@link
+     *                            java.util.Map} of key-value pairs. The keys should be limited to
+     *                            the ones required by the comparator.
+     * @return instance of the current builder
+     */
+    public Builder setKeyComparatorClass(String comparatorClassName,
+                                         @Nullable Map<String, String> comparatorConf) {
+      Preconditions.checkNotNull(comparatorClassName, "Comparator class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+          comparatorClassName);
+      if (comparatorConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, comparatorConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
+      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
+      if (enabled && compressionCodec != null) {
+        this.conf
+            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
+      }
+      if (codecConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Set serialization class and the relevant comparator to be used for sorting.
+     * Providing custom serialization class could change the way, keys needs to be compared in
+     * sorting. Providing invalid comparator here could create invalid results.
+     *
+     * @param serializationClassName
+     * @param comparatorClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName,
+        String comparatorClassName, @Nullable Map<String, String> serializerConf) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      Preconditions.checkArgument(comparatorClassName != null,
+          "comparator cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      setKeyComparatorClass(comparatorClassName, null);
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Create the actual configuration instance.
+     *
+     * @return an instance of the Configuration
+     */
+    public OrderedPartitionedKVOutputConfig build() {
+      return new OrderedPartitionedKVOutputConfig(this.conf);
+    }
+  }
+}
+


Mime
View raw message