tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [5/6] TEZ-1132. Consistent naming of Input and Outputs (bikas)
Date Sat, 16 Aug 2014 03:05:26 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java
deleted file mode 100644
index 68a522c..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Map;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-/**
- * Configure {@link org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput} </p>
- *
- * Values will be picked up from tez-site if not specified, otherwise defaults from
- * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
- */
-public class OnFileUnorderedPartitionedKVOutputConfigurer {
-  /**
-   * Configure parameters which are specific to the Output.
-   */
-  @InterfaceAudience.Private
-  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
-    /**
-     * Set the buffer size to use
-     *
-     * @param availableBufferSize the size of the buffer in MB
-     * @return instance of the current builder
-     */
-    public T setAvailableBufferSize(int availableBufferSize);
-  }
-
-  @SuppressWarnings("rawtypes")
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
-      SpecificConfigurer<SpecificBuilder> {
-
-    private final E edgeBuilder;
-    private final Builder builder;
-
-    SpecificBuilder(E edgeBuilder, Builder builder) {
-      this.edgeBuilder = edgeBuilder;
-      this.builder = builder;
-    }
-
-    @Override
-    public SpecificBuilder<E> setAvailableBufferSize(int availableBufferSize) {
-      builder.setAvailableBufferSize(availableBufferSize);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
-      builder.setAdditionalConfiguration(key, value);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
-      builder.setAdditionalConfiguration(confMap);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setFromConfiguration(Configuration conf) {
-      builder.setFromConfiguration(conf);
-      return this;
-    }
-
-    public E done() {
-      return edgeBuilder;
-    }
-  }
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  Configuration conf;
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  OnFileUnorderedPartitionedKVOutputConfigurer() {
-  }
-
-  private OnFileUnorderedPartitionedKVOutputConfigurer(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * Get a UserPayload representation of the Configuration
-   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
-   */
-  public UserPayload toUserPayload() {
-    try {
-      return TezUtils.createUserPayloadFromConf(conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @InterfaceAudience.Private
-  public void fromUserPayload(UserPayload payload) {
-    try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-
-  public static Builder newBuilder(String keyClass, String valClass, String partitionerClassName) {
-    return newBuilder(keyClass, valClass, partitionerClassName, null);
-  }
-
-  public static Builder newBuilder(String keyClass, String valClass, String partitionerClassName,
-                                   Map<String, String> partitionerConf) {
-    return new Builder(keyClass, valClass, partitionerClassName, partitionerConf);
-  }
-
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class Builder implements SpecificConfigurer<Builder> {
-
-    private final Configuration conf = new Configuration(false);
-
-    /**
-     * Create a configuration builder for {@link org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput}
-     *
-     * @param keyClassName         the key class name
-     * @param valueClassName       the value class name
-     * @param partitionerClassName the partitioner class name
-     * @param partitionerConf      configuration for the partitioner specified as a map of key-value
-     *                             pairs. This can be null
-     */
-    @InterfaceAudience.Private
-    Builder(String keyClassName, String valueClassName, String partitionerClassName,
-                   Map<String, String> partitionerConf) {
-      this();
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      Preconditions.checkNotNull(partitionerClassName, "Partitioner class name cannot be null");
-      setKeyClassName(keyClassName);
-      setValueClassName(valueClassName);
-      setPartitioner(partitionerClassName, partitionerConf);
-    }
-
-    @InterfaceAudience.Private
-    Builder() {
-      Map<String, String> tezDefaults = ConfigUtils
-          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
-              OnFileUnorderedPartitionedKVOutput.getConfigurationKeySet());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
-      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
-    }
-
-    @InterfaceAudience.Private
-    Builder setKeyClassName(String keyClassName) {
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
-      return this;
-    }
-
-    @InterfaceAudience.Private
-    Builder setValueClassName(String valueClassName) {
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
-      return this;
-    }
-
-    @InterfaceAudience.Private
-    Builder setPartitioner(String partitionerClassName, Map<String, String> partitionerConf) {
-      Preconditions.checkNotNull(partitionerClassName, "Partitioner class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, partitionerClassName);
-      if (partitionerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, partitionerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setAvailableBufferSize(int availableBufferSize) {
-      this.conf
-          .setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, availableBufferSize);
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(String key, String value) {
-      Preconditions.checkNotNull(key, "Key cannot be null");
-      if (ConfigUtils.doesKeyQualify(key,
-          Lists.newArrayList(OnFileUnorderedPartitionedKVOutput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
-          TezRuntimeConfiguration.getAllowedPrefixes())) {
-        if (value == null) {
-          this.conf.unset(key);
-        } else {
-          this.conf.set(key, value);
-        }
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
-      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
-          Lists.newArrayList(OnFileUnorderedPartitionedKVOutput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    @Override
-    public Builder setFromConfiguration(Configuration conf) {
-      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
-      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
-          Lists.newArrayList(OnFileUnorderedPartitionedKVOutput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
-      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
-      if (enabled && compressionCodec != null) {
-        this.conf
-            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
-      }
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for keys.
-     *
-     * @param serializationClassName
-     * @return
-     */
-    public Builder setKeySerializationClass(String serializationClassName) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for values.
-     *
-     * @param serializationClassName
-     * @return
-     */
-    public Builder setValueSerializationClass(String serializationClassName) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      return this;
-    }
-
-    /**
-     * Create the actual configuration instance.
-     *
-     * @return an instance of the Configuration
-     */
-    public OnFileUnorderedPartitionedKVOutputConfigurer build() {
-      return new OnFileUnorderedPartitionedKVOutputConfigurer(this.conf);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
new file mode 100644
index 0000000..016cbfd
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
@@ -0,0 +1,492 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
+import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+/**
+ * Configure {@link org.apache.tez.runtime.library.input.OrderedGroupedKVInput} </p>
+ *
+ * Values will be picked up from tez-site if not specified, otherwise defaults from
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
+ */
+public class OrderedGroupedKVInputConfigurer {
+
+  /**
+   * Configure parameters which are specific to the Input.
+   */
+  @InterfaceAudience.Private
+  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
+
+    /**
+     * Specifies whether the legacy version of this input should be used.
+     * @return instance of the current builder
+     */
+    public T useLegacyInput();
+    /**
+     * Sets the buffer fraction, as a fraction of container size, to be used while fetching remote
+     * data.
+     *
+     * @param shuffleBufferFraction fraction of container size
+     * @return instance of the current builder
+     */
+    public T setShuffleBufferFraction(float shuffleBufferFraction);
+
+    /**
+     * Sets the buffer fraction, as a fraction of container size, to be used after the fetch and
+     * merge are complete. This buffer is used to cache merged data and avoids writing it out to
+     * disk.
+     *
+     * @param postMergeBufferFraction fraction of container size
+     * @return instance of the current builder
+     */
+    public T setPostMergeBufferFraction(float postMergeBufferFraction);
+
+    /**
+     * Sets a size limit on the maximum segment size to be shuffled to disk. This is a fraction of
+     * the shuffle buffer.
+     *
+     * @param maxSingleSegmentFraction fraction of memory determined by ShuffleBufferFraction
+     * @return instance of the current builder
+     */
+    public T setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction);
+
+    /**
+     * Enable the memory to memory merger
+     *
+     * @param enable whether to enable the memory to memory merger
+     * @return
+     */
+    public T setMemToMemMerger(boolean enable); // Not super useful until additional params are used.
+
+    /**
+     * Configure the point at which in memory segments will be merged. This is specified as a
+     * fraction of the shuffle buffer.
+     *
+     * @param mergeFraction fraction of memory determined by ShuffleBufferFraction, which when
+     *                      filled, will
+     *                      trigger a merge
+     * @return instance of the current builder
+     */
+    public T setMergeFraction(float mergeFraction);
+
+    /**
+     * Configure the combiner class
+     *
+     * @param combinerClassName the combiner class name
+     * @return instance of the current builder
+     */
+    public T setCombiner(String combinerClassName);
+
+    /**
+     * Configure the combiner class and it's associated configuration (specified as key-value
+     * pairs). This method should only be used if the combiner requires some specific configuration.
+     * {@link #setCombiner(String)} is the preferred method for setting a combiner.
+     *
+     * @param combinerClassName the combiner class name
+     * @param combinerConf      the combiner configuration. This can be null, and otherwise
+     *                          is a {@link java.util.Map} of key-value pairs. The keys should
+     *                          be limited to the ones required by the combiner.
+     * @return instance of the current builder
+     */
+    public T setCombiner(String combinerClassName, @Nullable Map<String, String> combinerConf);
+
+  }
+
+  @SuppressWarnings("rawtypes")
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
+      SpecificConfigurer<SpecificBuilder> {
+
+    private final E edgeBuilder;
+    private final OrderedGroupedKVInputConfigurer.Builder builder;
+
+
+    @InterfaceAudience.Private
+    SpecificBuilder(E edgeBuilder, OrderedGroupedKVInputConfigurer.Builder builder) {
+      this.edgeBuilder = edgeBuilder;
+      this.builder = builder;
+    }
+
+    public SpecificBuilder<E> useLegacyInput() {
+      builder.useLegacyInput();
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setShuffleBufferFraction(float shuffleBufferFraction) {
+      builder.setShuffleBufferFraction(shuffleBufferFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setPostMergeBufferFraction(float postMergeBufferFraction) {
+      builder.setPostMergeBufferFraction(postMergeBufferFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
+      builder.setMaxSingleMemorySegmentFraction(maxSingleSegmentFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMemToMemMerger(boolean enable) {
+      builder.setMemToMemMerger(enable);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMergeFraction(float mergeFraction) {
+      builder.setMergeFraction(mergeFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setCombiner(String combinerClassName) {
+      return setCombiner(combinerClassName, null);
+    }
+
+    @Override
+    public SpecificBuilder<E> setCombiner(String combinerClassName, Map<String, String> combinerConf) {
+      builder.setCombiner(combinerClassName, combinerConf);
+      return this;
+    }
+
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
+      builder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
+      builder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setFromConfiguration(Configuration conf) {
+      builder.setFromConfiguration(conf);
+      return this;
+    }
+
+    public E done() {
+      return edgeBuilder;
+    }
+
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  Configuration conf;
+
+  private String inputClassName;
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  OrderedGroupedKVInputConfigurer() {
+  }
+
+  private OrderedGroupedKVInputConfigurer(Configuration conf, boolean useLegacyInput) {
+    this.conf = conf;
+    if (useLegacyInput) {
+      inputClassName = OrderedGroupedInputLegacy.class.getName();
+    } else {
+      inputClassName = OrderedGroupedKVInput.class.getName();
+    }
+  }
+
+  /**
+   * Get a UserPayload representation of the Configuration
+   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
+   */
+  public UserPayload toUserPayload() {
+    try {
+      return TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public void fromUserPayload(UserPayload payload) {
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(payload);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public String getInputClassName() {
+    return inputClassName;
+  }
+
+  public static Builder newBuilder(String keyClass, String valueClass) {
+    return new Builder(keyClass, valueClass);
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder implements SpecificConfigurer<Builder> {
+
+    private final Configuration conf = new Configuration(false);
+    private boolean useLegacyInput = false;
+
+    /**
+     * Create a configuration builder for {@link org.apache.tez.runtime.library.input.OrderedGroupedKVInput}
+     *
+     * @param keyClassName         the key class name
+     * @param valueClassName       the value class name
+     */
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName) {
+      this();
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      setKeyClassName(keyClassName);
+      setValueClassName(valueClassName);
+    }
+
+    @InterfaceAudience.Private
+    Builder() {
+      Map<String, String> tezDefaults = ConfigUtils
+          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
+              OrderedGroupedKVInput.getConfigurationKeySet());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
+      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
+    }
+
+    @InterfaceAudience.Private
+    Builder setKeyClassName(String keyClassName) {
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setValueClassName(String valueClassName) {
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
+      return this;
+    }
+
+    public Builder useLegacyInput() {
+      this.useLegacyInput = true;
+      return this;
+    }
+
+    @Override
+    public Builder setShuffleBufferFraction(float shuffleBufferFraction) {
+      this.conf
+          .setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, shuffleBufferFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setPostMergeBufferFraction(float postMergeBufferFraction) {
+      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, postMergeBufferFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
+      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+          maxSingleSegmentFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setMemToMemMerger(boolean enable) {
+      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, enable);
+      return this;
+    }
+
+    @Override
+    public Builder setMergeFraction(float mergeFraction) {
+      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, mergeFraction);
+      return this;
+    }
+
+    public Builder setCombiner(String combinerClassName) {
+      return setCombiner(combinerClassName, null);
+    }
+
+    @Override
+    public Builder setCombiner(String combinerClassName, Map<String, String> combinerConf) {
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, combinerClassName);
+      if (combinerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, combinerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Set the key comparator class
+     *
+     * @param comparatorClassName the key comparator class name
+     * @return instance of the current builder
+     */
+    public Builder setKeyComparatorClass(String comparatorClassName) {
+      return this.setKeyComparatorClass(comparatorClassName, null);
+    }
+
+    /**
+     * Set the key comparator class and it's associated configuration. This method should only be
+     * used if the comparator requires some specific configuration, which is typically not the
+     * case. {@link #setKeyComparatorClass(String)} is the preferred method for setting a
+     * comparator.
+     *
+     * @param comparatorClassName the key comparator class name
+     * @param comparatorConf      the comparator configuration. This can be null, and is a {@link
+     *                            java.util.Map} of key-value pairs. The keys should be limited to
+     *                            the ones required by the comparator.
+     * @return instance of the current builder
+     */
+    public Builder setKeyComparatorClass(String comparatorClassName,
+                                         @Nullable Map<String, String> comparatorConf) {
+      Preconditions.checkNotNull(comparatorClassName, "Comparator class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+          comparatorClassName);
+      if (comparatorConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, comparatorConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      Preconditions.checkNotNull(key, "Key cannot be null");
+      if (ConfigUtils.doesKeyQualify(key,
+          Lists.newArrayList(OrderedGroupedKVInput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
+          TezRuntimeConfiguration.getAllowedPrefixes())) {
+        if (value == null) {
+          this.conf.unset(key);
+        } else {
+          this.conf.set(key, value);
+        }
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
+          Lists.newArrayList(OrderedGroupedKVInput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
+      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
+          Lists.newArrayList(OrderedGroupedKVInput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
+      if (enabled && compressionCodec != null) {
+        this.conf
+            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
+      }
+      return this;
+    }
+
+    /**
+     * Set serialization class and the relevant comparator to be used for sorting.
+     * Providing custom serialization class could change the way, keys needs to be compared in
+     * sorting. Providing invalid comparator here could create invalid results.
+     *
+     * @param serializationClassName
+     * @param comparatorClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName,
+        String comparatorClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      Preconditions.checkArgument(comparatorClassName != null,
+          "comparator cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      setKeyComparatorClass(comparatorClassName, null);
+      return this;
+    }
+
+    /**
+     * Serialization class to be used for serializing values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
+     * Create the actual configuration instance.
+     *
+     * @return an instance of the Configuration
+     */
+    public OrderedGroupedKVInputConfigurer build() {
+      return new OrderedGroupedKVInputConfigurer(this.conf, this.useLegacyInput);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
index e3a58b9..a9c18b1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
@@ -32,10 +32,10 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
 
 /**
- * Configure payloads for the OnFileSortedOutput and ShuffledMergedInput pair </p>
+ * Configure payloads for the OrderedPartitionedKVOutput and OrderedGroupedKVInput pair </p>
  *
  * Values will be picked up from tez-site if not specified, otherwise defaults from
  * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
@@ -44,12 +44,12 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 @InterfaceStability.Evolving
 public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfigurer {
 
-  private final OnFileSortedOutputConfigurer outputConf;
-  private final ShuffledMergedInputConfigurer inputConf;
+  private final OrderedPartitionedKVOutputConfigurer outputConf;
+  private final OrderedGroupedKVInputConfigurer inputConf;
 
   private OrderedPartitionedKVEdgeConfigurer(
-      OnFileSortedOutputConfigurer outputConfiguration,
-      ShuffledMergedInputConfigurer inputConfiguration) {
+      OrderedPartitionedKVOutputConfigurer outputConfiguration,
+      OrderedGroupedKVInputConfigurer inputConfiguration) {
     this.outputConf = outputConfiguration;
     this.inputConf = inputConfiguration;
   }
@@ -94,7 +94,7 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
 
   @Override
   public String getOutputClassName() {
-    return OnFileSortedOutput.class.getName();
+    return OrderedPartitionedKVOutput.class.getName();
   }
 
   @Override
@@ -146,18 +146,18 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
   @InterfaceStability.Evolving
   public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder<Builder> {
 
-    private final OnFileSortedOutputConfigurer.Builder outputBuilder =
-        new OnFileSortedOutputConfigurer.Builder();
-    private final OnFileSortedOutputConfigurer.SpecificBuilder<OrderedPartitionedKVEdgeConfigurer.Builder>
+    private final OrderedPartitionedKVOutputConfigurer.Builder outputBuilder =
+        new OrderedPartitionedKVOutputConfigurer.Builder();
+    private final OrderedPartitionedKVOutputConfigurer.SpecificBuilder<OrderedPartitionedKVEdgeConfigurer.Builder>
         specificOutputBuilder =
-        new OnFileSortedOutputConfigurer.SpecificBuilder<OrderedPartitionedKVEdgeConfigurer.Builder>(
+        new OrderedPartitionedKVOutputConfigurer.SpecificBuilder<OrderedPartitionedKVEdgeConfigurer.Builder>(
             this, outputBuilder);
 
-    private final ShuffledMergedInputConfigurer.Builder inputBuilder =
-        new ShuffledMergedInputConfigurer.Builder();
-    private final ShuffledMergedInputConfigurer.SpecificBuilder<OrderedPartitionedKVEdgeConfigurer.Builder>
+    private final OrderedGroupedKVInputConfigurer.Builder inputBuilder =
+        new OrderedGroupedKVInputConfigurer.Builder();
+    private final OrderedGroupedKVInputConfigurer.SpecificBuilder<OrderedPartitionedKVEdgeConfigurer.Builder>
         specificInputBuilder =
-        new ShuffledMergedInputConfigurer.SpecificBuilder<OrderedPartitionedKVEdgeConfigurer.Builder>(this,
+        new OrderedGroupedKVInputConfigurer.SpecificBuilder<OrderedPartitionedKVEdgeConfigurer.Builder>(this,
             inputBuilder);
 
     @InterfaceAudience.Private
@@ -260,7 +260,7 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
      * Configure the specific output
      * @return a builder to configure the output
      */
-    public OnFileSortedOutputConfigurer.SpecificBuilder<Builder> configureOutput() {
+    public OrderedPartitionedKVOutputConfigurer.SpecificBuilder<Builder> configureOutput() {
       return specificOutputBuilder;
     }
 
@@ -268,7 +268,7 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
      * Configure the specific input
      * @return a builder to configure the input
      */
-    public ShuffledMergedInputConfigurer.SpecificBuilder<Builder> configureInput() {
+    public OrderedGroupedKVInputConfigurer.SpecificBuilder<Builder> configureInput() {
       return specificInputBuilder;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
new file mode 100644
index 0000000..72063e0
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
@@ -0,0 +1,418 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+/**
+ * Configure {@link org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput} </p>
+ * 
+ * Values will be picked up from tez-site if not specified, otherwise defaults from
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
+ */
+public class OrderedPartitionedKVOutputConfigurer {
+
+  /**
+   * Configure parameters which are specific to the Output.
+   */
+  @InterfaceAudience.Private
+  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
+    /**
+     * Set the buffer size to use when sort the output
+     *
+     * @param sortBufferSize the size of the buffer in MB
+     * @return instance of the current builder
+     */
+    public T setSortBufferSize(int sortBufferSize);
+
+
+    /**
+     * Configure the combiner class
+     *
+     * @param combinerClassName the combiner class name
+     * @return instance of the current builder
+     */
+    public T setCombiner(String combinerClassName);
+
+    /**
+     * Configure the combiner class and it's associated configuration (specified as key-value
+     * pairs). This method should only be used if the combiner requires some specific configuration.
+     * {@link #setCombiner(String)} is the preferred method for setting a combiner.
+     *
+     * @param combinerClassName the combiner class name
+     * @param combinerConf      the combiner configuration. This can be null, and otherwise
+     *                          is a {@link java.util.Map} of key-value pairs. The keys should
+     *                          be limited to the ones required by the combiner.
+     * @return instance of the current builder
+     */
+    public T setCombiner(String combinerClassName, @Nullable Map<String, String> combinerConf);
+
+
+
+    /**
+     * Configure the number of threads to be used by the sorter
+     *
+     * @param numThreads the number of threads
+     * @return instance of the current builder
+     */
+    public T setSorterNumThreads(int numThreads);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
+      SpecificConfigurer<SpecificBuilder> {
+
+    private final E edgeBuilder;
+    private final Builder builder;
+
+    SpecificBuilder(E edgeBuilder, Builder builder) {
+      this.edgeBuilder = edgeBuilder;
+      this.builder = builder;
+    }
+
+    @Override
+    public SpecificBuilder<E> setSortBufferSize(int sortBufferSize) {
+      builder.setSortBufferSize(sortBufferSize);
+      return this;
+    }
+
+    public SpecificBuilder<E> setCombiner(String combinerClassName) {
+      return this.setCombiner(combinerClassName, null);
+    }
+
+    @Override
+    public SpecificBuilder<E> setCombiner(String combinerClassName, Map<String, String> combinerConf) {
+      builder.setCombiner(combinerClassName, combinerConf);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setSorterNumThreads(int numThreads) {
+      builder.setSorterNumThreads(numThreads);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
+      builder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
+      builder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setFromConfiguration(Configuration conf) {
+      builder.setFromConfiguration(conf);
+      return this;
+    }
+
+    public E done() {
+      return edgeBuilder;
+    }
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  Configuration conf;
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  OrderedPartitionedKVOutputConfigurer() {
+  }
+
+  private OrderedPartitionedKVOutputConfigurer(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get a UserPayload representation of the Configuration
+   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
+   */
+  public UserPayload toUserPayload() {
+    try {
+      return TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public void fromUserPayload(UserPayload payload) {
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(payload);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Builder newBuilder(String keyClass, String valueClass, String partitionerClassName) {
+    return newBuilder(keyClass, valueClass, partitionerClassName, null);
+  }
+
+  public static Builder newBuilder(String keyClass, String valueClass, String partitionerClassName,
+                                   Map<String, String> partitionerConf) {
+    return new Builder(keyClass, valueClass, partitionerClassName, partitionerConf);
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder implements SpecificConfigurer<Builder> {
+
+    private final Configuration conf = new Configuration(false);
+
+    /**
+     * Create a configuration builder for {@link org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput}
+     *
+     * @param keyClassName         the key class name
+     * @param valueClassName       the value class name
+     * @param partitionerClassName the partitioner class name
+     * @param partitionerConf      the partitioner configuration. This can be null, and is a {@link
+     *                             java.util.Map} of key-value pairs. The keys should be limited to
+     *                             the ones required by the partitioner.
+     */
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName, String partitionerClassName,
+                   @Nullable Map<String, String> partitionerConf) {
+      this();
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      Preconditions.checkNotNull(partitionerClassName, "Partitioner class name cannot be null");
+      setKeyClassName(keyClassName);
+      setValueClassName(valueClassName);
+      setPartitioner(partitionerClassName, partitionerConf);
+    }
+
+    @InterfaceAudience.Private
+    Builder() {
+      Map<String, String> tezDefaults = ConfigUtils
+          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
+              OrderedPartitionedKVOutput.getConfigurationKeySet());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
+      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
+    }
+
+    @InterfaceAudience.Private
+    Builder setKeyClassName(String keyClassName) {
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setValueClassName(String valueClassName) {
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setPartitioner(String partitionerClassName, @Nullable Map<String, String> partitionerConf) {
+      Preconditions.checkNotNull(partitionerClassName, "Partitioner class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, partitionerClassName);
+      if (partitionerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, partitionerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setSortBufferSize(int sortBufferSize) {
+      this.conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, sortBufferSize);
+      return this;
+    }
+
+    @Override
+    public Builder setCombiner(String combinerClassName) {
+      return this.setCombiner(combinerClassName, null);
+    }
+
+    @Override
+    public Builder setCombiner(String combinerClassName, Map<String, String> combinerConf) {
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, combinerClassName);
+      if (combinerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, combinerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setSorterNumThreads(int numThreads) {
+      this.conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, numThreads);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      Preconditions.checkNotNull(key, "Key cannot be null");
+      if (ConfigUtils.doesKeyQualify(key,
+          Lists.newArrayList(OrderedPartitionedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
+          TezRuntimeConfiguration.getAllowedPrefixes())) {
+        if (value == null) {
+          this.conf.unset(key);
+        } else {
+          this.conf.set(key, value);
+        }
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
+          Lists.newArrayList(OrderedPartitionedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
+      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
+          Lists.newArrayList(OrderedPartitionedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    /**
+     * Set the key comparator class
+     *
+     * @param comparatorClassName the key comparator class name
+     * @return instance of the current builder
+     */
+    public Builder setKeyComparatorClass(String comparatorClassName) {
+      return this.setKeyComparatorClass(comparatorClassName, null);
+    }
+
+    /**
+     * Set the key comparator class and it's associated configuration. This method should only be
+     * used if the comparator requires some specific configuration, which is typically not the
+     * case. {@link #setKeyComparatorClass(String)} is the preferred method for setting a
+     * comparator.
+     *
+     * @param comparatorClassName the key comparator class name
+     * @param comparatorConf      the comparator configuration. This can be null, and is a {@link
+     *                            java.util.Map} of key-value pairs. The keys should be limited to
+     *                            the ones required by the comparator.
+     * @return instance of the current builder
+     */
+    public Builder setKeyComparatorClass(String comparatorClassName,
+                                         @Nullable Map<String, String> comparatorConf) {
+      Preconditions.checkNotNull(comparatorClassName, "Comparator class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+          comparatorClassName);
+      if (comparatorConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, comparatorConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
+      if (enabled && compressionCodec != null) {
+        this.conf
+            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
+      }
+      return this;
+    }
+
+    /**
+     * Set serialization class and the relevant comparator to be used for sorting.
+     * Providing custom serialization class could change the way, keys needs to be compared in
+     * sorting. Providing invalid comparator here could create invalid results.
+     *
+     * @param serializationClassName
+     * @param comparatorClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName,
+        String comparatorClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      Preconditions.checkArgument(comparatorClassName != null,
+          "comparator cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      setKeyComparatorClass(comparatorClassName, null);
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
+     * Create the actual configuration instance.
+     *
+     * @return an instance of the Configuration
+     */
+    public OrderedPartitionedKVOutputConfigurer build() {
+      return new OrderedPartitionedKVOutputConfigurer(this.conf);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java
deleted file mode 100644
index 5175cd0..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Map;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-/**
- * Configure {@link org.apache.tez.runtime.library.input.ShuffledMergedInput} </p>
- *
- * Values will be picked up from tez-site if not specified, otherwise defaults from
- * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
- */
-public class ShuffledMergedInputConfigurer {
-
-  /**
-   * Configure parameters which are specific to the Input.
-   */
-  @InterfaceAudience.Private
-  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
-
-    /**
-     * Specifies whether the legacy version of this input should be used.
-     * @return instance of the current builder
-     */
-    public T useLegacyInput();
-    /**
-     * Sets the buffer fraction, as a fraction of container size, to be used while fetching remote
-     * data.
-     *
-     * @param shuffleBufferFraction fraction of container size
-     * @return instance of the current builder
-     */
-    public T setShuffleBufferFraction(float shuffleBufferFraction);
-
-    /**
-     * Sets the buffer fraction, as a fraction of container size, to be used after the fetch and
-     * merge are complete. This buffer is used to cache merged data and avoids writing it out to
-     * disk.
-     *
-     * @param postMergeBufferFraction fraction of container size
-     * @return instance of the current builder
-     */
-    public T setPostMergeBufferFraction(float postMergeBufferFraction);
-
-    /**
-     * Sets a size limit on the maximum segment size to be shuffled to disk. This is a fraction of
-     * the shuffle buffer.
-     *
-     * @param maxSingleSegmentFraction fraction of memory determined by ShuffleBufferFraction
-     * @return instance of the current builder
-     */
-    public T setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction);
-
-    /**
-     * Enable the memory to memory merger
-     *
-     * @param enable whether to enable the memory to memory merger
-     * @return
-     */
-    public T setMemToMemMerger(boolean enable); // Not super useful until additional params are used.
-
-    /**
-     * Configure the point at which in memory segments will be merged. This is specified as a
-     * fraction of the shuffle buffer.
-     *
-     * @param mergeFraction fraction of memory determined by ShuffleBufferFraction, which when
-     *                      filled, will
-     *                      trigger a merge
-     * @return instance of the current builder
-     */
-    public T setMergeFraction(float mergeFraction);
-
-    /**
-     * Configure the combiner class
-     *
-     * @param combinerClassName the combiner class name
-     * @return instance of the current builder
-     */
-    public T setCombiner(String combinerClassName);
-
-    /**
-     * Configure the combiner class and it's associated configuration (specified as key-value
-     * pairs). This method should only be used if the combiner requires some specific configuration.
-     * {@link #setCombiner(String)} is the preferred method for setting a combiner.
-     *
-     * @param combinerClassName the combiner class name
-     * @param combinerConf      the combiner configuration. This can be null, and otherwise
-     *                          is a {@link java.util.Map} of key-value pairs. The keys should
-     *                          be limited to the ones required by the combiner.
-     * @return instance of the current builder
-     */
-    public T setCombiner(String combinerClassName, @Nullable Map<String, String> combinerConf);
-
-  }
-
-  @SuppressWarnings("rawtypes")
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
-      SpecificConfigurer<SpecificBuilder> {
-
-    private final E edgeBuilder;
-    private final ShuffledMergedInputConfigurer.Builder builder;
-
-
-    @InterfaceAudience.Private
-    SpecificBuilder(E edgeBuilder, ShuffledMergedInputConfigurer.Builder builder) {
-      this.edgeBuilder = edgeBuilder;
-      this.builder = builder;
-    }
-
-    public SpecificBuilder<E> useLegacyInput() {
-      builder.useLegacyInput();
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setShuffleBufferFraction(float shuffleBufferFraction) {
-      builder.setShuffleBufferFraction(shuffleBufferFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setPostMergeBufferFraction(float postMergeBufferFraction) {
-      builder.setPostMergeBufferFraction(postMergeBufferFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
-      builder.setMaxSingleMemorySegmentFraction(maxSingleSegmentFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setMemToMemMerger(boolean enable) {
-      builder.setMemToMemMerger(enable);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setMergeFraction(float mergeFraction) {
-      builder.setMergeFraction(mergeFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setCombiner(String combinerClassName) {
-      return setCombiner(combinerClassName, null);
-    }
-
-    @Override
-    public SpecificBuilder<E> setCombiner(String combinerClassName, Map<String, String> combinerConf) {
-      builder.setCombiner(combinerClassName, combinerConf);
-      return this;
-    }
-
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
-      builder.setAdditionalConfiguration(key, value);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
-      builder.setAdditionalConfiguration(confMap);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setFromConfiguration(Configuration conf) {
-      builder.setFromConfiguration(conf);
-      return this;
-    }
-
-    public E done() {
-      return edgeBuilder;
-    }
-
-  }
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  Configuration conf;
-
-  private String inputClassName;
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  ShuffledMergedInputConfigurer() {
-  }
-
-  private ShuffledMergedInputConfigurer(Configuration conf, boolean useLegacyInput) {
-    this.conf = conf;
-    if (useLegacyInput) {
-      inputClassName = ShuffledMergedInputLegacy.class.getName();
-    } else {
-      inputClassName = ShuffledMergedInput.class.getName();
-    }
-  }
-
-  /**
-   * Get a UserPayload representation of the Configuration
-   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
-   */
-  public UserPayload toUserPayload() {
-    try {
-      return TezUtils.createUserPayloadFromConf(conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @InterfaceAudience.Private
-  public void fromUserPayload(UserPayload payload) {
-    try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public String getInputClassName() {
-    return inputClassName;
-  }
-
-  public static Builder newBuilder(String keyClass, String valueClass) {
-    return new Builder(keyClass, valueClass);
-  }
-
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class Builder implements SpecificConfigurer<Builder> {
-
-    private final Configuration conf = new Configuration(false);
-    private boolean useLegacyInput = false;
-
-    /**
-     * Create a configuration builder for {@link org.apache.tez.runtime.library.input.ShuffledMergedInput}
-     *
-     * @param keyClassName         the key class name
-     * @param valueClassName       the value class name
-     */
-    @InterfaceAudience.Private
-    Builder(String keyClassName, String valueClassName) {
-      this();
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      setKeyClassName(keyClassName);
-      setValueClassName(valueClassName);
-    }
-
-    @InterfaceAudience.Private
-    Builder() {
-      Map<String, String> tezDefaults = ConfigUtils
-          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
-              ShuffledMergedInput.getConfigurationKeySet());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
-      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
-    }
-
-    @InterfaceAudience.Private
-    Builder setKeyClassName(String keyClassName) {
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
-      return this;
-    }
-
-    @InterfaceAudience.Private
-    Builder setValueClassName(String valueClassName) {
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
-      return this;
-    }
-
-    public Builder useLegacyInput() {
-      this.useLegacyInput = true;
-      return this;
-    }
-
-    @Override
-    public Builder setShuffleBufferFraction(float shuffleBufferFraction) {
-      this.conf
-          .setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, shuffleBufferFraction);
-      return this;
-    }
-
-    @Override
-    public Builder setPostMergeBufferFraction(float postMergeBufferFraction) {
-      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, postMergeBufferFraction);
-      return this;
-    }
-
-    @Override
-    public Builder setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
-      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
-          maxSingleSegmentFraction);
-      return this;
-    }
-
-    @Override
-    public Builder setMemToMemMerger(boolean enable) {
-      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, enable);
-      return this;
-    }
-
-    @Override
-    public Builder setMergeFraction(float mergeFraction) {
-      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, mergeFraction);
-      return this;
-    }
-
-    public Builder setCombiner(String combinerClassName) {
-      return setCombiner(combinerClassName, null);
-    }
-
-    @Override
-    public Builder setCombiner(String combinerClassName, Map<String, String> combinerConf) {
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, combinerClassName);
-      if (combinerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, combinerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    /**
-     * Set the key comparator class
-     *
-     * @param comparatorClassName the key comparator class name
-     * @return instance of the current builder
-     */
-    public Builder setKeyComparatorClass(String comparatorClassName) {
-      return this.setKeyComparatorClass(comparatorClassName, null);
-    }
-
-    /**
-     * Set the key comparator class and it's associated configuration. This method should only be
-     * used if the comparator requires some specific configuration, which is typically not the
-     * case. {@link #setKeyComparatorClass(String)} is the preferred method for setting a
-     * comparator.
-     *
-     * @param comparatorClassName the key comparator class name
-     * @param comparatorConf      the comparator configuration. This can be null, and is a {@link
-     *                            java.util.Map} of key-value pairs. The keys should be limited to
-     *                            the ones required by the comparator.
-     * @return instance of the current builder
-     */
-    public Builder setKeyComparatorClass(String comparatorClassName,
-                                         @Nullable Map<String, String> comparatorConf) {
-      Preconditions.checkNotNull(comparatorClassName, "Comparator class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
-          comparatorClassName);
-      if (comparatorConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, comparatorConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-    @Override
-    public Builder setAdditionalConfiguration(String key, String value) {
-      Preconditions.checkNotNull(key, "Key cannot be null");
-      if (ConfigUtils.doesKeyQualify(key,
-          Lists.newArrayList(ShuffledMergedInput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
-          TezRuntimeConfiguration.getAllowedPrefixes())) {
-        if (value == null) {
-          this.conf.unset(key);
-        } else {
-          this.conf.set(key, value);
-        }
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
-      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
-          Lists.newArrayList(ShuffledMergedInput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    @Override
-    public Builder setFromConfiguration(Configuration conf) {
-      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
-      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
-          Lists.newArrayList(ShuffledMergedInput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
-      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
-      if (enabled && compressionCodec != null) {
-        this.conf
-            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
-      }
-      return this;
-    }
-
-    /**
-     * Set serialization class and the relevant comparator to be used for sorting.
-     * Providing custom serialization class could change the way, keys needs to be compared in
-     * sorting. Providing invalid comparator here could create invalid results.
-     *
-     * @param serializationClassName
-     * @param comparatorClassName
-     * @return
-     */
-    public Builder setKeySerializationClass(String serializationClassName,
-        String comparatorClassName) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      Preconditions.checkArgument(comparatorClassName != null,
-          "comparator cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      setKeyComparatorClass(comparatorClassName, null);
-      return this;
-    }
-
-    /**
-     * Serialization class to be used for serializing values.
-     *
-     * @param serializationClassName
-     * @return
-     */
-    public Builder setValueSerializationClass(String serializationClassName) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      return this;
-    }
-
-    /**
-     * Create the actual configuration instance.
-     *
-     * @return an instance of the Configuration
-     */
-    public ShuffledMergedInputConfigurer build() {
-      return new ShuffledMergedInputConfigurer(this.conf, this.useLegacyInput);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfigurer.java
deleted file mode 100644
index 628c40b..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfigurer.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Map;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-/**
- * Configure {@link org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput} </p>
- *
- * Values will be picked up from tez-site if not specified, otherwise defaults from
- * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
- */
-public class ShuffledUnorderedKVInputConfigurer {
-
-  /**
-   * Configure parameters which are specific to the Input.
-   */
-  @InterfaceAudience.Private
-  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
-
-    /**
-     * Sets the buffer fraction, as a fraction of container size, to be used while fetching remote
-     * data.
-     *
-     * @param shuffleBufferFraction fraction of container size
-     * @return instance of the current builder
-     */
-    public T setShuffleBufferFraction(float shuffleBufferFraction);
-
-    /**
-     * Sets a size limit on the maximum segment size to be shuffled to disk. This is a fraction of
-     * the shuffle buffer.
-     *
-     * @param maxSingleSegmentFraction fraction of memory determined by ShuffleBufferFraction
-     * @return instance of the current builder
-     */
-    public T setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction);
-
-    /**
-     * Configure the point at which in memory segments will be merged and written out to a single
-     * large disk segment. This is specified as a
-     * fraction of the shuffle buffer. </p> Has no affect at the moment.
-     *
-     * @param mergeFraction fraction of memory determined by ShuffleBufferFraction, which when
-     *                      filled, will
-     *                      trigger a merge
-     * @return instance of the current builder
-     */
-    public T setMergeFraction(float mergeFraction);
-
-  }
-
-  @SuppressWarnings("rawtypes")
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
-      SpecificConfigurer<SpecificBuilder> {
-
-    private final E edgeBuilder;
-    private final ShuffledUnorderedKVInputConfigurer.Builder builder;
-
-
-    @InterfaceAudience.Private
-    SpecificBuilder(E edgeBuilder, ShuffledUnorderedKVInputConfigurer.Builder builder) {
-      this.edgeBuilder = edgeBuilder;
-      this.builder = builder;
-    }
-
-    @Override
-    public SpecificBuilder<E> setShuffleBufferFraction(float shuffleBufferFraction) {
-      builder.setShuffleBufferFraction(shuffleBufferFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
-      builder.setMaxSingleMemorySegmentFraction(maxSingleSegmentFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setMergeFraction(float mergeFraction) {
-      builder.setMergeFraction(mergeFraction);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
-      builder.setAdditionalConfiguration(key, value);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
-      builder.setAdditionalConfiguration(confMap);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setFromConfiguration(Configuration conf) {
-      builder.setFromConfiguration(conf);
-      return this;
-    }
-
-    public E done() {
-      return edgeBuilder;
-    }
-
-  }
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  Configuration conf;
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  ShuffledUnorderedKVInputConfigurer() {
-  }
-
-  private ShuffledUnorderedKVInputConfigurer(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * Get a UserPayload representation of the Configuration
-   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
-   */
-  public UserPayload toUserPayload() {
-    try {
-      return TezUtils.createUserPayloadFromConf(conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @InterfaceAudience.Private
-  public void fromUserPayload(UserPayload payload) {
-    try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static Builder newBuilder(String keyClass, String valueClass) {
-    return new Builder(keyClass, valueClass);
-  }
-
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class Builder implements SpecificConfigurer<Builder> {
-
-    private final Configuration conf = new Configuration(false);
-
-    /**
-     * Create a configuration builder for {@link org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput}
-     *
-     * @param keyClassName         the key class name
-     * @param valueClassName       the value class name
-     */
-    @InterfaceAudience.Private
-    Builder(String keyClassName, String valueClassName) {
-      this();
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      setKeyClassName(keyClassName);
-      setValueClassName(valueClassName);
-    }
-
-    @InterfaceAudience.Private
-    Builder() {
-      Map<String, String> tezDefaults = ConfigUtils
-          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
-              ShuffledUnorderedKVInput.getConfigurationKeySet());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
-      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
-    }
-
-    @InterfaceAudience.Private
-    Builder setKeyClassName(String keyClassName) {
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
-      return this;
-    }
-
-    @InterfaceAudience.Private
-    Builder setValueClassName(String valueClassName) {
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
-      return this;
-    }
-
-    @Override
-    public Builder setShuffleBufferFraction(float shuffleBufferFraction) {
-      this.conf
-          .setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, shuffleBufferFraction);
-      return this;
-    }
-
-    @Override
-    public Builder setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
-      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
-          maxSingleSegmentFraction);
-      return this;
-    }
-
-    @Override
-    public Builder setMergeFraction(float mergeFraction) {
-      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, mergeFraction);
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(String key, String value) {
-      Preconditions.checkNotNull(key, "Key cannot be null");
-      if (ConfigUtils.doesKeyQualify(key,
-          Lists.newArrayList(ShuffledUnorderedKVInput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
-          TezRuntimeConfiguration.getAllowedPrefixes())) {
-        if (value == null) {
-          this.conf.unset(key);
-        } else {
-          this.conf.set(key, value);
-        }
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
-      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
-          Lists.newArrayList(ShuffledUnorderedKVInput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    @Override
-    public Builder setFromConfiguration(Configuration conf) {
-      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
-      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
-          Lists.newArrayList(ShuffledUnorderedKVInput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
-      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
-      if (enabled && compressionCodec != null) {
-        this.conf
-            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
-      }
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for key/value and
-     * the corresponding comparator class to be used as key comparator.
-     *
-     * @param serializationClassName
-     * @return
-     */
-    public Builder setKeySerializationClass(String serializationClassName) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for values.
-     *
-     * @param serializationClassName
-     * @return
-     */
-    public Builder setValueSerializationClass(String serializationClassName) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      return this;
-    }
-
-    /**
-     * Create the actual configuration instance.
-     *
-     * @return an instance of the Configuration
-     */
-    public ShuffledUnorderedKVInputConfigurer build() {
-      return new ShuffledUnorderedKVInputConfigurer(this.conf);
-    }
-  }
-}


Mime
View raw message