tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/3] TEZ-1080. Add specific Configuration APIs for non MR based Inputs / Outputs. (sseth)
Date Mon, 07 Jul 2014 21:51:47 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
new file mode 100644
index 0000000..6e9b93e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
@@ -0,0 +1,392 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ShuffledMergedInputConfiguration {
+
+  /**
+   * Configure parameters which are specific to the Input.
+   */
+  @InterfaceAudience.Private
+  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
+
+    /**
+     * Sets the buffer fraction, as a fraction of container size, to be used while fetching remote
+     * data.
+     *
+     * @param shuffleBufferFraction fraction of container size
+     * @return instance of the current builder
+     */
+    public T setShuffleBufferFraction(float shuffleBufferFraction);
+
+    /**
+     * Sets the buffer fraction, as a fraction of container size, to be used after the fetch and
+     * merge are complete. This buffer is used to cache merged data and avoids writing it out to
+     * disk.
+     *
+     * @param postMergeBufferFraction fraction of container size
+     * @return instance of the current builder
+     */
+    public T setPostMergeBufferFraction(float postMergeBufferFraction);
+
+    /**
+     * Sets a size limit on the maximum segment size to be shuffled to disk. This is a fraction of
+     * the shuffle buffer.
+     *
+     * @param maxSingleSegmentFraction fraction of memory determined by ShuffleBufferFraction
+     * @return instance of the current builder
+     */
+    public T setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction);
+
+    /**
+     * Enable the memory to memory merger
+     *
+     * @return
+     */
+    public T enableMemToMemMerger(); // Not super useful until additional params are used.
+
+    /**
+     * Configure the point at which in memory segments will be merged. This is specified as a
+     * fraction of the shuffle buffer.
+     *
+     * @param mergeFraction fraction of memory determined by ShuffleBufferFraction, which when
+     *                      filled, will
+     *                      trigger a merge
+     * @return instance of the current builder
+     */
+    public T setMergeFraction(float mergeFraction);
+
+    /**
+     * Enable encrypted data transfer
+     *
+     * @return instance of the current builder
+     */
+    public T enableEncryptedTransfer();
+
+    /**
+     * Configure the combiner class
+     *
+     * @param combinerClassName the combiner class name
+     * @param combinerConf configuration specific to the Combiner. This can be null
+     * @return instance of the current builder
+     */
+    public T setCombiner(String combinerClassName, Configuration combinerConf);
+
+  }
+
+  @SuppressWarnings("rawtypes")
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseConf.Builder> implements
+      SpecificConfigurer<SpecificBuilder> {
+
+    private final E edgeBuilder;
+    private final ShuffledMergedInputConfiguration.Builder builder;
+
+
+    @InterfaceAudience.Private
+    SpecificBuilder(E edgeBuilder, ShuffledMergedInputConfiguration.Builder builder) {
+      this.edgeBuilder = edgeBuilder;
+      this.builder = builder;
+    }
+
+    @Override
+    public SpecificBuilder<E> setShuffleBufferFraction(float shuffleBufferFraction) {
+      builder.setShuffleBufferFraction(shuffleBufferFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setPostMergeBufferFraction(float postMergeBufferFraction) {
+      builder.setPostMergeBufferFraction(postMergeBufferFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
+      builder.setMaxSingleMemorySegmentFraction(maxSingleSegmentFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> enableMemToMemMerger() {
+      builder.enableMemToMemMerger();
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMergeFraction(float mergeFraction) {
+      builder.setMergeFraction(mergeFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> enableEncryptedTransfer() {
+      builder.enableEncryptedTransfer();
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setCombiner(String combinerClassName, Configuration combinerConf) {
+      builder.setCombiner(combinerClassName, combinerConf);
+      return this;
+    }
+
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
+      builder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
+      builder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setFromConfiguration(Configuration conf) {
+      builder.setFromConfiguration(conf);
+      return this;
+    }
+
+    public E done() {
+      return edgeBuilder;
+    }
+
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  Configuration conf;
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  ShuffledMergedInputConfiguration() {
+  }
+
+  private ShuffledMergedInputConfiguration(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get a byte array representation of the configuration
+   * @return a byte array which can be used as the payload
+   */
+  public byte[] toByteArray() {
+    try {
+      return TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void fromByteArray(byte[] payload) {
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(payload);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Builder newBuilder(String keyClass, String valueClass) {
+    return new Builder(keyClass, valueClass);
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder implements SpecificConfigurer<Builder> {
+
+    private final Configuration conf = new Configuration(false);
+
+    /**
+     * Create a configuration builder for {@link org.apache.tez.runtime.library.input.ShuffledMergedInput}
+     *
+     * @param keyClassName         the key class name
+     * @param valueClassName       the value class name
+     */
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName) {
+      this();
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      setKeyClassName(keyClassName);
+      setValueClassName(valueClassName);
+    }
+
+    @InterfaceAudience.Private
+    Builder() {
+      Map<String, String> tezDefaults = ConfigUtils
+          .extractConfigurationMap(TezJobConfig.getTezRuntimeConfigDefaults(),
+              ShuffledMergedInput.getConfigurationKeySet());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
+      ConfigUtils.addConfigMapToConfiguration(this.conf, TezJobConfig.getOtherConfigDefaults());
+    }
+
+    @InterfaceAudience.Private
+    Builder setKeyClassName(String keyClassName) {
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, keyClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setValueClassName(String valueClassName) {
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, valueClassName);
+      return this;
+    }
+
+    @Override
+    public Builder setShuffleBufferFraction(float shuffleBufferFraction) {
+      this.conf
+          .setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, shuffleBufferFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setPostMergeBufferFraction(float postMergeBufferFraction) {
+      this.conf.setFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT, postMergeBufferFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
+      this.conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+          maxSingleSegmentFraction);
+      return this;
+    }
+
+    @Override
+    public Builder enableMemToMemMerger() {
+      this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, true);
+      return this;
+    }
+
+    @Override
+    public Builder setMergeFraction(float mergeFraction) {
+      this.conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, mergeFraction);
+      return this;
+    }
+
+    @Override
+    public Builder enableEncryptedTransfer() {
+      this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, true);
+      return this;
+    }
+
+    @Override
+    public Builder setCombiner(String combinerClassName, Configuration combinerConf) {
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS, combinerClassName);
+      if (combinerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, combinerConf,
+            TezJobConfig.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    /**
+     * Set the key comparator class
+     *
+     * @param comparatorClassName the key comparator class name
+     * @return instance of the current builder
+     */
+    public Builder setKeyComparatorClass(String comparatorClassName) {
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+          comparatorClassName);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      Preconditions.checkNotNull(key, "Key cannot be null");
+      if (ConfigUtils.doesKeyQualify(key,
+          Lists.newArrayList(ShuffledMergedInput.getConfigurationKeySet(),
+              TezJobConfig.getRuntimeAdditionalConfigKeySet()),
+          TezJobConfig.getAllowedPrefixes())) {
+        if (value == null) {
+          this.conf.unset(key);
+        } else {
+          this.conf.set(key, value);
+        }
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
+          Lists.newArrayList(ShuffledMergedInput.getConfigurationKeySet(),
+              TezJobConfig.getRuntimeAdditionalConfigKeySet()), TezJobConfig.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
+      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
+          Lists.newArrayList(ShuffledMergedInput.getConfigurationKeySet(),
+              TezJobConfig.getRuntimeAdditionalConfigKeySet()), TezJobConfig.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    public Builder enableCompression(String compressionCodec) {
+      this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED, true);
+      if (compressionCodec != null) {
+        this.conf
+            .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, compressionCodec);
+      }
+      return this;
+    }
+
+    /**
+     * Create the actual configuration instance.
+     *
+     * @return an instance of the Configuration
+     */
+    public ShuffledMergedInputConfiguration build() {
+      return new ShuffledMergedInputConfiguration(this.conf);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
new file mode 100644
index 0000000..7df9917
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
@@ -0,0 +1,312 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ShuffledUnorderedKVInputConfiguration {
+
+  /**
+   * Configure parameters which are specific to the Input.
+   */
+  @InterfaceAudience.Private
+  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
+
+    /**
+     * Sets the buffer fraction, as a fraction of container size, to be used while fetching remote
+     * data.
+     *
+     * @param shuffleBufferFraction fraction of container size
+     * @return instance of the current builder
+     */
+    public T setShuffleBufferFraction(float shuffleBufferFraction);
+
+    /**
+     * Sets a size limit on the maximum segment size to be shuffled to disk. This is a fraction of
+     * the shuffle buffer.
+     *
+     * @param maxSingleSegmentFraction fraction of memory determined by ShuffleBufferFraction
+     * @return instance of the current builder
+     */
+    public T setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction);
+
+    /**
+     * Configure the point at which in memory segments will be merged and written out to a single
+     * large disk segment. This is specified as a
+     * fraction of the shuffle buffer. </p> Has no affect at the moment.
+     *
+     * @param mergeFraction fraction of memory determined by ShuffleBufferFraction, which when
+     *                      filled, will
+     *                      trigger a merge
+     * @return instance of the current builder
+     */
+    public T setMergeFraction(float mergeFraction);
+
+    /**
+     * Enable encrypted data transfer
+     *
+     * @return instance of the current builder
+     */
+    public T enableEncryptedTransfer();
+
+  }
+
+  @SuppressWarnings("rawtypes")
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseConf.Builder> implements
+      SpecificConfigurer<SpecificBuilder> {
+
+    private final E edgeBuilder;
+    private final ShuffledUnorderedKVInputConfiguration.Builder builder;
+
+
+    @InterfaceAudience.Private
+    SpecificBuilder(E edgeBuilder, ShuffledUnorderedKVInputConfiguration.Builder builder) {
+      this.edgeBuilder = edgeBuilder;
+      this.builder = builder;
+    }
+
+    @Override
+    public SpecificBuilder<E> setShuffleBufferFraction(float shuffleBufferFraction) {
+      builder.setShuffleBufferFraction(shuffleBufferFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
+      builder.setMaxSingleMemorySegmentFraction(maxSingleSegmentFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMergeFraction(float mergeFraction) {
+      builder.setMergeFraction(mergeFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> enableEncryptedTransfer() {
+      builder.enableEncryptedTransfer();
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
+      builder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
+      builder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setFromConfiguration(Configuration conf) {
+      builder.setFromConfiguration(conf);
+      return this;
+    }
+
+    public E done() {
+      return edgeBuilder;
+    }
+
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  Configuration conf;
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  ShuffledUnorderedKVInputConfiguration() {
+  }
+
+  private ShuffledUnorderedKVInputConfiguration(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get a byte array representation of the configuration
+   * @return a byte array which can be used as the payload
+   */
+  public byte[] toByteArray() {
+    try {
+      return TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void fromByteArray(byte[] payload) {
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(payload);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Builder newBuilder(String keyClass, String valueClass) {
+    return new Builder(keyClass, valueClass);
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder implements SpecificConfigurer<Builder> {
+
+    private final Configuration conf = new Configuration(false);
+
+    /**
+     * Create a configuration builder for {@link org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput}
+     *
+     * @param keyClassName         the key class name
+     * @param valueClassName       the value class name
+     */
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName) {
+      this();
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      setKeyClassName(keyClassName);
+      setValueClassName(valueClassName);
+    }
+
+    @InterfaceAudience.Private
+    Builder() {
+      Map<String, String> tezDefaults = ConfigUtils
+          .extractConfigurationMap(TezJobConfig.getTezRuntimeConfigDefaults(),
+              ShuffledUnorderedKVInput.getConfigurationKeySet());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
+      ConfigUtils.addConfigMapToConfiguration(this.conf, TezJobConfig.getOtherConfigDefaults());
+    }
+
+    @InterfaceAudience.Private
+    Builder setKeyClassName(String keyClassName) {
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, keyClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setValueClassName(String valueClassName) {
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      this.conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, valueClassName);
+      return this;
+    }
+
+    @Override
+    public Builder setShuffleBufferFraction(float shuffleBufferFraction) {
+      this.conf
+          .setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, shuffleBufferFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
+      this.conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+          maxSingleSegmentFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setMergeFraction(float mergeFraction) {
+      this.conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, mergeFraction);
+      return this;
+    }
+
+    @Override
+    public Builder enableEncryptedTransfer() {
+      this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, true);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      Preconditions.checkNotNull(key, "Key cannot be null");
+      if (ConfigUtils.doesKeyQualify(key,
+          Lists.newArrayList(ShuffledUnorderedKVInput.getConfigurationKeySet(),
+              TezJobConfig.getRuntimeAdditionalConfigKeySet()),
+          TezJobConfig.getAllowedPrefixes())) {
+        if (value == null) {
+          this.conf.unset(key);
+        } else {
+          this.conf.set(key, value);
+        }
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
+          Lists.newArrayList(ShuffledUnorderedKVInput.getConfigurationKeySet(),
+              TezJobConfig.getRuntimeAdditionalConfigKeySet()), TezJobConfig.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
+      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
+          Lists.newArrayList(ShuffledUnorderedKVInput.getConfigurationKeySet(),
+              TezJobConfig.getRuntimeAdditionalConfigKeySet()), TezJobConfig.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    public Builder enableCompression(String compressionCodec) {
+      this.conf.setBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED, true);
+      if (compressionCodec != null) {
+        this.conf
+            .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, compressionCodec);
+      }
+      return this;
+    }
+
+    /**
+     * Create the actual configuration instance.
+     *
+     * @return an instance of the Configuration
+     */
+    public ShuffledUnorderedKVInputConfiguration build() {
+      return new ShuffledUnorderedKVInputConfiguration(this.conf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfiguration.java
new file mode 100644
index 0000000..4a40346
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfiguration.java
@@ -0,0 +1,191 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
+
+/**
+ * Configure payloads for the OnFileUnorderedPartitionedKVOutput and ShuffledUnorderedKVInput pair
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class UnorderedPartitionedKVEdgeConfiguration extends HadoopKeyValuesBasedBaseConf {
+
+  private final OnFileUnorderedPartitionedKVOutputConfiguration outputConf;
+  private final ShuffledUnorderedKVInputConfiguration inputConf;
+
+  private UnorderedPartitionedKVEdgeConfiguration(
+      OnFileUnorderedPartitionedKVOutputConfiguration outputConfiguration,
+      ShuffledUnorderedKVInputConfiguration inputConfiguration) {
+    this.outputConf = outputConfiguration;
+    this.inputConf = inputConfiguration;
+
+  }
+
+  /**
+   * Create a builder to configure the relevant Input and Output
+   * @param keyClassName the key class name
+   * @param valueClassName the value class name
+   * @return a builder to configure the edge
+   */
+  public static Builder newBuilder(String keyClassName, String valueClassName) {
+    return new Builder(keyClassName, valueClassName);
+  }
+
+  @Override
+  public byte[] getOutputPayload() {
+    return outputConf.toByteArray();
+  }
+
+  @Override
+  public String getOutputClassName() {
+    return OnFileUnorderedPartitionedKVOutput.class.getName();
+  }
+
+  @Override
+  public byte[] getInputPayload() {
+    return inputConf.toByteArray();
+  }
+
+  @Override
+  public String getInputClassName() {
+    return ShuffledUnorderedKVInput.class.getName();
+  }
+
+  /**
+   * This is a convenience method for the typical usage of this edge, and creates an instance of
+   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
+   * If custom edge properties are required, the methods to get the relevant payloads should be
+   * used. </p>
+   * * In this case - DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED,
+   * EdgeProperty.SchedulingType.SEQUENTIAL
+   *
+   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
+   */
+  public EdgeProperty createDefaultEdgeProperty() {
+    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER,
+        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
+        new OutputDescriptor(
+            getOutputClassName()).setUserPayload(getOutputPayload()),
+        new InputDescriptor(
+            getInputClassName()).setUserPayload(getInputPayload()));
+    return edgeProperty;
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder extends HadoopKeyValuesBasedBaseConf.Builder<Builder> {
+
+    private boolean outputConfigured = false;
+
+    private final OnFileUnorderedPartitionedKVOutputConfiguration.Builder outputBuilder =
+        new OnFileUnorderedPartitionedKVOutputConfiguration.Builder();
+    private final OnFileUnorderedPartitionedKVOutputConfiguration.SpecificBuilder<UnorderedPartitionedKVEdgeConfiguration.Builder>
+        specificOutputBuilder =
+        new OnFileUnorderedPartitionedKVOutputConfiguration.SpecificBuilder<UnorderedPartitionedKVEdgeConfiguration.Builder>(
+            this, outputBuilder);
+
+    private final ShuffledUnorderedKVInputConfiguration.Builder inputBuilder =
+        new ShuffledUnorderedKVInputConfiguration.Builder();
+    private final ShuffledUnorderedKVInputConfiguration.SpecificBuilder<UnorderedPartitionedKVEdgeConfiguration.Builder>
+        specificInputBuilder =
+        new ShuffledUnorderedKVInputConfiguration.SpecificBuilder<UnorderedPartitionedKVEdgeConfiguration.Builder>(
+            this, inputBuilder);
+
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName) {
+      outputBuilder.setKeyClassName(keyClassName);
+      outputBuilder.setValueClassName(valueClassName);
+      inputBuilder.setKeyClassName(keyClassName);
+      inputBuilder.setValueClassName(valueClassName);
+    }
+
+    @Override
+    public Builder enableCompression(String compressionCodec) {
+      outputBuilder.enableCompression(compressionCodec);
+      inputBuilder.enableCompression(compressionCodec);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      outputBuilder.setAdditionalConfiguration(key, value);
+      inputBuilder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      outputBuilder.setAdditionalConfiguration(confMap);
+      inputBuilder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      outputBuilder.setFromConfiguration(conf);
+      inputBuilder.setFromConfiguration(conf);
+      return this;
+    }
+
+    /**
+     * Configure the specific output
+     * @param partitionerClassName the partitioner class name
+     * @param partitionerConf configuration for the partitioner. This can be null
+     * @return a builder to configure the output
+     */
+    public OnFileUnorderedPartitionedKVOutputConfiguration.SpecificBuilder<Builder> configureOutput(
+        String partitionerClassName, Configuration partitionerConf) {
+      outputConfigured = true;
+      outputBuilder.setPartitioner(partitionerClassName, partitionerConf);
+      return specificOutputBuilder;
+    }
+
+    /**
+     * Configure the specific input
+     * @return a builder to configure the input
+     */
+    public ShuffledUnorderedKVInputConfiguration.SpecificBuilder<Builder> configureInput() {
+      return specificInputBuilder;
+    }
+
+    /**
+     * Build and return an instance of the configuration
+     * @return an instance of the acatual configuration
+     */
+    public UnorderedPartitionedKVEdgeConfiguration build() {
+      Preconditions
+          .checkState(outputConfigured == true, "Output must be configured - partitioner required");
+      return new UnorderedPartitionedKVEdgeConfiguration(outputBuilder.build(), inputBuilder.build());
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfiguration.java
new file mode 100644
index 0000000..5699043
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfiguration.java
@@ -0,0 +1,200 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+
+/**
+ * Configure payloads for the OnFileUnorderedKVOutput and ShuffledUnorderedKVInput pair
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class UnorderedUnpartitionedKVEdgeConfiguration extends HadoopKeyValuesBasedBaseConf {
+  private final OnFileUnorderedKVOutputConfiguration outputConf;
+  private final ShuffledUnorderedKVInputConfiguration inputConf;
+
+  private UnorderedUnpartitionedKVEdgeConfiguration(
+      OnFileUnorderedKVOutputConfiguration outputConfiguration,
+      ShuffledUnorderedKVInputConfiguration inputConfiguration) {
+    this.outputConf = outputConfiguration;
+    this.inputConf = inputConfiguration;
+
+  }
+
+  /**
+   * Create a builder to configure the relevant Input and Output
+   * @param keyClassName the key class name
+   * @param valueClassName the value class name
+   * @return a builder to configure the edge
+   */
+  public static Builder newBuilder(String keyClassName, String valueClassName) {
+    return new Builder(keyClassName, valueClassName);
+  }
+
+  @Override
+  public byte[] getOutputPayload() {
+    return outputConf.toByteArray();
+  }
+
+  @Override
+  public String getOutputClassName() {
+    return OnFileUnorderedKVOutput.class.getName();
+  }
+
+  @Override
+  public byte[] getInputPayload() {
+    return inputConf.toByteArray();
+  }
+
+  @Override
+  public String getInputClassName() {
+    return ShuffledUnorderedKVInput.class.getName();
+  }
+
+  /**
+   * This is a convenience method for the typical usage of this edge, and creates an instance of
+   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
+   * If custom edge properties are required, the methods to get the relevant payloads should be
+   * used. </p>
+   * * In this case - DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED,
+   * EdgeProperty.SchedulingType.SEQUENTIAL
+   *
+   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
+   */
+  public EdgeProperty createDefaultBroadcastEdgeProperty() {
+    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.BROADCAST,
+        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
+        new OutputDescriptor(
+            getOutputClassName()).setUserPayload(getOutputPayload()),
+        new InputDescriptor(
+            getInputClassName()).setUserPayload(getInputPayload()));
+    return edgeProperty;
+  }
+
+  /**
+   * This is a convenience method for the typical usage of this edge, and creates an instance of
+   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
+   * If custom edge properties are required, the methods to get the relevant payloads should be
+   * used. </p>
+   * * In this case - DataMovementType.ONE_TO_ONE, EdgeProperty.DataSourceType.PERSISTED,
+   * EdgeProperty.SchedulingType.SEQUENTIAL
+   *
+   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
+   */
+  public EdgeProperty createDefaultOneToOneEdgeProperty() {
+    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.ONE_TO_ONE,
+        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
+        new OutputDescriptor(
+            getOutputClassName()).setUserPayload(getOutputPayload()),
+        new InputDescriptor(
+            getInputClassName()).setUserPayload(getInputPayload()));
+    return edgeProperty;
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder extends HadoopKeyValuesBasedBaseConf.Builder<Builder> {
+
+    private final OnFileUnorderedKVOutputConfiguration.Builder outputBuilder =
+        new OnFileUnorderedKVOutputConfiguration.Builder();
+    private final OnFileUnorderedKVOutputConfiguration.SpecificBuilder<UnorderedUnpartitionedKVEdgeConfiguration.Builder>
+        specificOutputBuilder =
+        new OnFileUnorderedKVOutputConfiguration.SpecificBuilder<UnorderedUnpartitionedKVEdgeConfiguration.Builder>(
+            this, outputBuilder);
+
+    private final ShuffledUnorderedKVInputConfiguration.Builder inputBuilder =
+        new ShuffledUnorderedKVInputConfiguration.Builder();
+    private final ShuffledUnorderedKVInputConfiguration.SpecificBuilder<UnorderedUnpartitionedKVEdgeConfiguration.Builder>
+        specificInputBuilder =
+        new ShuffledUnorderedKVInputConfiguration.SpecificBuilder<UnorderedUnpartitionedKVEdgeConfiguration.Builder>(
+            this, inputBuilder);
+
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName) {
+      outputBuilder.setKeyClassName(keyClassName);
+      outputBuilder.setValueClassName(valueClassName);
+      inputBuilder.setKeyClassName(keyClassName);
+      inputBuilder.setValueClassName(valueClassName);
+    }
+
+    @Override
+    public Builder enableCompression(String compressionCodec) {
+      outputBuilder.enableCompression(compressionCodec);
+      inputBuilder.enableCompression(compressionCodec);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      outputBuilder.setAdditionalConfiguration(key, value);
+      inputBuilder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      outputBuilder.setAdditionalConfiguration(confMap);
+      inputBuilder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      outputBuilder.setFromConfiguration(conf);
+      inputBuilder.setFromConfiguration(conf);
+      return this;
+    }
+
+    /**
+     * Configure the specific output
+     * @return a builder to configure the output
+     */
+    public OnFileUnorderedKVOutputConfiguration.SpecificBuilder<Builder> configureOutput() {
+      return specificOutputBuilder;
+    }
+
+    /**
+     * Configure the specific input
+     * @return a builder to configure the input
+     */
+    public ShuffledUnorderedKVInputConfiguration.SpecificBuilder<Builder> configureInput() {
+      return specificInputBuilder;
+    }
+
+    /**
+     * Build and return an instance of the configuration
+     * @return an instance of the acatual configuration
+     */
+    public UnorderedUnpartitionedKVEdgeConfiguration build() {
+      return new UnorderedUnpartitionedKVEdgeConfiguration(outputBuilder.build(), inputBuilder.build());
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 42b2764..ce79124 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -18,15 +18,14 @@
 package org.apache.tez.runtime.library.input;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.tez.common.TezJobConfig;
@@ -72,7 +71,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
 
   private TezCounter inputKeyCounter;
   private TezCounter inputValueCounter;
-  
+
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
   @Override
@@ -87,7 +86,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
           + getContext().getSourceVertexName());
       return Collections.emptyList();
     }
-    
+
     long initialMemoryRequest = Shuffle.getInitialMemoryRequirement(conf,
         getContext().getTotalMemoryAvailableToTask());
     this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
@@ -100,7 +99,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
 
     return Collections.emptyList();
   }
-  
+
   @Override
   public synchronized void start() throws IOException {
     if (!isStarted.get()) {
@@ -128,8 +127,8 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
    * @return true if the input is ready for consumption, or if an error occurred
    *         processing fetching the input. false if the shuffle and merge are
    *         still in progress
-   * @throws InterruptedException 
-   * @throws IOException 
+   * @throws InterruptedException
+   * @throws IOException
    */
   public synchronized boolean isInputReady() throws IOException, InterruptedException {
     Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
@@ -154,7 +153,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
       }
       localShuffleCopy = shuffle;
     }
-    
+
     TezRawKeyValueIterator localRawIter = localShuffleCopy.waitForInput();
     synchronized(this) {
       rawIter = localRawIter;
@@ -284,4 +283,52 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
       return valuesIter.getValues();
     }
   };
+
+
+  private static final Set<String> confKeys = new HashSet<String>();
+
+  static {
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COMBINE_MIN_SPILLS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTERS_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_GROUP_NAME_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_NAME_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_GROUPS_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS);
+  }
+
+  // TODO Maybe add helper methods to extract keys
+  // TODO Maybe add constants or an Enum to access the keys
+
+  @InterfaceAudience.Private
+  public static Set<String> getConfigurationKeySet() {
+    return Collections.unmodifiableSet(confKeys);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index dd14a82..24d9bc4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -19,15 +19,14 @@ git diff * Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.tez.runtime.library.input;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
@@ -53,7 +52,7 @@ import com.google.common.base.Preconditions;
 public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
 
   private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVInput.class);
-  
+
   private Configuration conf;
   private ShuffleManager shuffleManager;
   private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
@@ -61,10 +60,10 @@ public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
   private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
   @SuppressWarnings("rawtypes")
   private ShuffledUnorderedKVReader kvReader;
-  
+
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
   private TezCounter inputRecordCounter;
-  
+
   private SimpleFetchedInputAllocator inputManager;
   private ShuffleEventHandler inputEventHandler;
 
@@ -203,8 +202,8 @@ public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
     return SimpleFetchedInputAllocator.getInitialMemoryReq(conf,
         getContext().getTotalMemoryAvailableToTask());
   }
-  
-  
+
+
   @SuppressWarnings("rawtypes")
   private ShuffledUnorderedKVReader createReader(TezCounter inputRecordCounter, CompressionCodec codec,
       int ifileBufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength)
@@ -213,4 +212,43 @@ public class ShuffledUnorderedKVInput extends AbstractLogicalInput {
         ifileReadAheadLength, ifileBufferSize, inputRecordCounter);
   }
 
+  private static final Set<String> confKeys = new HashSet<String>();
+
+  static {
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTERS_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_GROUP_NAME_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_NAME_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_GROUPS_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC);
+  }
+
+  // TODO Maybe add helper methods to extract keys
+  // TODO Maybe add constants or an Enum to access the keys
+
+  @InterfaceAudience.Private
+  public static Set<String> getConfigurationKeySet() {
+    return Collections.unmodifiableSet(confKeys);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 908635b..bc32ede 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -21,11 +21,17 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -48,10 +54,6 @@ import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-
 /**
  * <code>OnFileSortedOutput</code> is an {@link AbstractLogicalOutput} which sorts key/value pairs 
  * written to it and persists it to a file.
@@ -187,4 +189,40 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
 
     return events;
   }
-}
+
+
+  private static final Set<String> confKeys = new HashSet<String>();
+
+  static {
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COMBINE_MIN_SPILLS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_SORT_THREADS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTERS_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_GROUP_NAME_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_NAME_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_GROUPS_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+  }
+
+  // TODO Maybe add helper methods to extract keys
+  // TODO Maybe add constants or an Enum to access the keys
+
+  @InterfaceAudience.Private
+  public static Set<String> getConfigurationKeySet() {
+    return Collections.unmodifiableSet(confKeys);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index ccff8e0..f5e42ac 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -21,10 +21,13 @@ package org.apache.tez.runtime.library.output;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -160,4 +163,28 @@ public class OnFileUnorderedKVOutput extends AbstractLogicalOutput {
     return System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
   }
 
+  private static final Set<String> confKeys = new HashSet<String>();
+
+  static {
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTERS_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_GROUP_NAME_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_NAME_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_GROUPS_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+  }
+
+  // TODO Maybe add helper methods to extract keys
+  // TODO Maybe add constants or an Enum to access the keys
+
+  @InterfaceAudience.Private
+  public static Set<String> getConfigurationKeySet() {
+    return Collections.unmodifiableSet(confKeys);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
index d6aab6b..102ca4e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
@@ -19,12 +19,17 @@
 package org.apache.tez.runtime.library.output;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.api.Event;
@@ -34,17 +39,10 @@ import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
 
-import com.google.common.base.Preconditions;
-
 /**
  * <code>OnFileUnorderedPartitionedKVOutput</code> is a {@link LogicalOutput} which can be used to
  * write Key-Value pairs. The key-value pairs are written to the correct partition based on the
  * configured Partitioner.
- * 
- * This currently acts as a usable placeholder for writing unordered output (the data is sorted,
- * which should be functionally correct since there's no guarantees on order without a sort).
- * TEZ-661 to add a proper implementation.
- * 
  */
 public class OnFileUnorderedPartitionedKVOutput implements LogicalOutput {
 
@@ -62,7 +60,8 @@ public class OnFileUnorderedPartitionedKVOutput implements LogicalOutput {
     this.outputContext = outputContext;
     this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
     this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, outputContext.getWorkDirs());
-    this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, this.numPhysicalOutputs);
+    this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS,
+        this.numPhysicalOutputs);
     this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
     outputContext.requestInitialMemory(
         UnorderedPartitionedKVWriter.getInitialMemoryRequirement(conf,
@@ -103,4 +102,33 @@ public class OnFileUnorderedPartitionedKVOutput implements LogicalOutput {
   public synchronized void setNumPhysicalOutputs(int numOutputs) {
     this.numPhysicalOutputs = numOutputs;
   }
+
+  private static final Set<String> confKeys = new HashSet<String>();
+
+  static {
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTERS_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_GROUP_NAME_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_NAME_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_COUNTER_GROUPS_MAX_KEY);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC);
+    confKeys.add(TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+  }
+
+  // TODO Maybe add helper methods to extract keys
+  // TODO Maybe add constants or an Enum to access the keys
+
+  @InterfaceAudience.Private
+  public static Set<String> getConfigurationKeySet() {
+    return Collections.unmodifiableSet(confKeys);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfiguration.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfiguration.java
new file mode 100644
index 0000000..c3b82c9
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfiguration.java
@@ -0,0 +1,197 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.junit.Test;
+
+public class TestOnFileSortedOutputConfiguration {
+
+  @Test
+  public void testNullParams() {
+    try {
+      OnFileSortedOutputConfiguration.newBuilder(
+          null, "VALUE", "PARTITIONER", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      OnFileSortedOutputConfiguration.newBuilder(
+          "KEY", null, "PARTITIONER", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      OnFileSortedOutputConfiguration.newBuilder(
+          "KEY", "VALUE", null, null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+  }
+
+  @Test
+  public void testSetters() {
+    Configuration fromConf = new Configuration(false);
+    fromConf.set("test.conf.key.1", "confkey1");
+    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111);
+    fromConf.set("fs.shouldExist", "fs");
+    Map<String, String> additionalConf = new HashMap<String, String>();
+    additionalConf.put("test.key.2", "key2");
+    additionalConf.put("io.shouldExist", "io");
+    additionalConf.put(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "TestInternalSorter");
+    OnFileSortedOutputConfiguration.Builder builder =
+        OnFileSortedOutputConfiguration.newBuilder("KEY", "VALUE", "PARTITIONER", null)
+            .setKeyComparatorClass("KEY_COMPARATOR")
+            .enableCompression("CustomCodec")
+            .setSortBufferSize(2048)
+            .setAdditionalConfiguration("test.key.1", "key1")
+            .setAdditionalConfiguration("file.shouldExist", "file")
+            .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+                String.valueOf(false))
+            .setAdditionalConfiguration(additionalConf)
+            .setFromConfiguration(fromConf);
+
+    OnFileSortedOutputConfiguration configuration = builder.build();
+
+
+    byte[] confBytes = configuration.toByteArray();
+    OnFileSortedOutputConfiguration rebuilt = new OnFileSortedOutputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    // Verify programmatic API usage
+    assertEquals(2048, conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 0));
+    assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, ""));
+    assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, ""));
+    assertEquals("PARTITIONER", conf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, ""));
+    assertEquals("CustomCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+    assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS,
+        false));
+    assertEquals("KEY_COMPARATOR", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS));
+
+    // Verify additional configs
+    assertEquals(false, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals(1111, conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT));
+    assertEquals("TestInternalSorter",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, ""));
+    assertEquals("io", conf.get("io.shouldExist"));
+    assertEquals("file", conf.get("file.shouldExist"));
+    assertEquals("fs", conf.get("fs.shouldExist"));
+    assertNull(conf.get("test.conf.key.1"));
+    assertNull(conf.get("test.key.1"));
+    assertNull(conf.get("test.key.2"));
+  }
+
+  @Test
+  public void testDefaultConfigsUsed() {
+    OnFileSortedOutputConfiguration.Builder builder =
+        OnFileSortedOutputConfiguration.newBuilder("KEY", "VALUE", "PARTITIONER", null);
+    OnFileSortedOutputConfiguration configuration = builder.build();
+
+    byte[] confBytes = configuration.toByteArray();
+    OnFileSortedOutputConfiguration rebuilt = new OnFileSortedOutputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+
+    // Default Output property present.
+    assertEquals("TestCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+    // Input property should be absent
+    assertEquals("DEFAULT",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "DEFAULT"));
+
+    // Verify whatever was configured
+    assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, ""));
+    assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, ""));
+    assertEquals("PARTITIONER", conf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, ""));
+  }
+
+  @Test
+  public void testPartitionerConfigs() {
+    Configuration partitionerConf = new Configuration(false);
+    partitionerConf.set("partitioner.test.key", "PARTITIONERKEY");
+    partitionerConf
+        .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "InvalidKeyOverride");
+    OnFileSortedOutputConfiguration.Builder builder =
+        OnFileSortedOutputConfiguration.newBuilder("KEY", "VALUE", "PARTITIONER", partitionerConf);
+
+    OnFileSortedOutputConfiguration configuration = builder.build();
+
+    byte[] confBytes = configuration.toByteArray();
+    OnFileSortedOutputConfiguration rebuilt = new OnFileSortedOutputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    // Default Output property should not be overridden based on partitioner config
+    assertEquals("TestCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+
+    assertEquals("PARTITIONERKEY", conf.get("partitioner.test.key"));
+  }
+
+  @Test
+  public void testCombinerConfigs() {
+    Configuration combinerConf = new Configuration(false);
+    combinerConf.set("combiner.test.key", "COMBINERKEY");
+    combinerConf
+        .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "InvalidKeyOverride");
+    OnFileSortedOutputConfiguration.Builder builder =
+        OnFileSortedOutputConfiguration.newBuilder("KEY", "VALUE", "PARTITIONER", null)
+            .setCombiner("COMBINER", combinerConf);
+
+    OnFileSortedOutputConfiguration configuration = builder.build();
+
+    byte[] confBytes = configuration.toByteArray();
+    OnFileSortedOutputConfiguration rebuilt = new OnFileSortedOutputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    // Default Output property should not be overridden based on partitioner config
+    assertEquals("TestCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+
+    assertEquals("COMBINERKEY", conf.get("combiner.test.key"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfiguration.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfiguration.java
new file mode 100644
index 0000000..d06de67
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfiguration.java
@@ -0,0 +1,135 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.junit.Test;
+
+public class TestOnFileUnorderedKVOutputConfiguration {
+
+  @Test
+  public void testNullParams() {
+    try {
+      OnFileUnorderedKVOutputConfiguration.newBuilder(
+          null, "VALUE");
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      OnFileUnorderedKVOutputConfiguration.newBuilder(
+          "KEY", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+  }
+
+  @Test
+  public void testSetters() {
+    Configuration fromConf = new Configuration(false);
+    fromConf.set("test.conf.key.1", "confkey1");
+    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111);
+    fromConf.set("io.shouldExist", "io");
+    Map<String, String> additionalConf = new HashMap<String, String>();
+    additionalConf.put("test.key.2", "key2");
+    additionalConf.put(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, "2222");
+    additionalConf.put("file.shouldExist", "file");
+    OnFileUnorderedKVOutputConfiguration.Builder builder =
+        OnFileUnorderedKVOutputConfiguration.newBuilder("KEY", "VALUE")
+            .enableCompression("CustomCodec")
+            .setAdditionalConfiguration("fs.shouldExist", "fs")
+            .setAdditionalConfiguration("test.key.1", "key1")
+            .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+                String.valueOf(false))
+            .setAdditionalConfiguration(additionalConf)
+            .setFromConfiguration(fromConf);
+
+    OnFileUnorderedKVOutputConfiguration configuration = builder.build();
+
+
+    byte[] confBytes = configuration.toByteArray();
+    OnFileUnorderedKVOutputConfiguration rebuilt =
+        new OnFileUnorderedKVOutputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    // Verify programmatic API usage
+    assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, ""));
+    assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, ""));
+    assertEquals("CustomCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+    assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS,
+        false));
+
+    // Verify additional configs
+    assertEquals(false, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals(1111, conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT));
+    assertEquals("io", conf.get("io.shouldExist"));
+    assertEquals("file", conf.get("file.shouldExist"));
+    assertEquals("fs", conf.get("fs.shouldExist"));
+    assertNull(conf.get("test.conf.key.1"));
+    assertNull(conf.get("test.key.1"));
+    assertNull(conf.get("test.key.2"));
+  }
+
+  @Test
+  public void testDefaultConfigsUsed() {
+    OnFileUnorderedKVOutputConfiguration.Builder builder =
+        OnFileUnorderedKVOutputConfiguration
+            .newBuilder("KEY", "VALUE");
+    OnFileUnorderedKVOutputConfiguration configuration = builder.build();
+
+    byte[] confBytes = configuration.toByteArray();
+    OnFileUnorderedKVOutputConfiguration rebuilt =
+        new OnFileUnorderedKVOutputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+
+    // Default Output property present.
+    assertEquals("TestCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+    // Input property should be absent
+    assertEquals("DEFAULT",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "DEFAULT"));
+
+    // Verify whatever was configured
+    assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, ""));
+    assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, ""));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3535b142/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java
new file mode 100644
index 0000000..d9a7c87
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java
@@ -0,0 +1,176 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.junit.Test;
+
+public class TestOnFileUnorderedPartitionedKVOutput {
+
+  @Test
+  public void testNullParams() {
+    try {
+      OnFileUnorderedPartitionedKVOutputConfiguration.newBuilder(
+          null, "VALUE", "PARTITIONER", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      OnFileUnorderedPartitionedKVOutputConfiguration.newBuilder(
+          "KEY", null, "PARTITIONER", null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      OnFileUnorderedPartitionedKVOutputConfiguration.newBuilder(
+          "KEY", "VALUE", null, null);
+      fail("Expecting a null parameter list to fail");
+    } catch (NullPointerException npe) {
+      assertTrue(npe.getMessage().contains("cannot be null"));
+    }
+  }
+
+  @Test
+  public void testSetters() {
+    Configuration fromConf = new Configuration(false);
+    fromConf.set("test.conf.key.1", "confkey1");
+    fromConf.setInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111);
+    fromConf.set("io.shouldExist", "io");
+    Map<String, String> additionalConf = new HashMap<String, String>();
+    additionalConf.put("test.key.2", "key2");
+    additionalConf.put(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, "2222");
+    additionalConf.put("file.shouldExist", "file");
+    OnFileUnorderedPartitionedKVOutputConfiguration.Builder builder =
+        OnFileUnorderedPartitionedKVOutputConfiguration.newBuilder("KEY", "VALUE", "PARTITIONER",
+            null)
+            .enableCompression("CustomCodec")
+            .setAvailableBufferSize(1111)
+            .setAdditionalConfiguration("fs.shouldExist", "fs")
+            .setAdditionalConfiguration("test.key.1", "key1")
+            .setAdditionalConfiguration(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+                String.valueOf(false))
+            .setAdditionalConfiguration(additionalConf)
+            .setFromConfiguration(fromConf);
+
+    OnFileUnorderedPartitionedKVOutputConfiguration configuration = builder.build();
+
+
+    byte[] confBytes = configuration.toByteArray();
+    OnFileUnorderedPartitionedKVOutputConfiguration rebuilt =
+        new OnFileUnorderedPartitionedKVOutputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    // Verify programmatic API usage
+    assertEquals(1111, conf.getInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 0));
+    assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, ""));
+    assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, ""));
+    assertEquals("PARTITIONER", conf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, ""));
+    assertEquals("CustomCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+    assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS,
+        false));
+
+    // Verify additional configs
+    assertEquals(false, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+    assertEquals(1111, conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT));
+    assertEquals(2222,
+        conf.getInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, 0));
+    assertEquals("io", conf.get("io.shouldExist"));
+    assertEquals("file", conf.get("file.shouldExist"));
+    assertEquals("fs", conf.get("fs.shouldExist"));
+    assertNull(conf.get("test.conf.key.1"));
+    assertNull(conf.get("test.key.1"));
+    assertNull(conf.get("test.key.2"));
+  }
+
+  @Test
+  public void testDefaultConfigsUsed() {
+    OnFileUnorderedPartitionedKVOutputConfiguration.Builder builder =
+        OnFileUnorderedPartitionedKVOutputConfiguration
+            .newBuilder("KEY", "VALUE", "PARTITIONER", null);
+    OnFileUnorderedPartitionedKVOutputConfiguration configuration = builder.build();
+
+    byte[] confBytes = configuration.toByteArray();
+    OnFileUnorderedPartitionedKVOutputConfiguration rebuilt =
+        new OnFileUnorderedPartitionedKVOutputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    assertEquals(true, conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
+
+    // Default Output property present.
+    assertEquals("TestCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+    // Input property should be absent
+    assertEquals("DEFAULT",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC, "DEFAULT"));
+
+    // Verify whatever was configured
+    assertEquals("KEY", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, ""));
+    assertEquals("VALUE", conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, ""));
+    assertEquals("PARTITIONER", conf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, ""));
+  }
+
+  @Test
+  public void testPartitionerConfigs() {
+    Configuration partitionerConf = new Configuration(false);
+    partitionerConf.set("partitioner.test.key", "PARTITIONERKEY");
+    partitionerConf
+        .set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, "InvalidKeyOverride");
+    OnFileUnorderedPartitionedKVOutputConfiguration.Builder builder =
+        OnFileUnorderedPartitionedKVOutputConfiguration
+            .newBuilder("KEY", "VALUE", "PARTITIONER", partitionerConf);
+
+    OnFileUnorderedPartitionedKVOutputConfiguration configuration = builder.build();
+
+    byte[] confBytes = configuration.toByteArray();
+    OnFileUnorderedPartitionedKVOutputConfiguration rebuilt =
+        new OnFileUnorderedPartitionedKVOutputConfiguration();
+    rebuilt.fromByteArray(confBytes);
+
+    Configuration conf = rebuilt.conf;
+
+    // Default Output property should not be overridden based on partitioner config
+    assertEquals("TestCodec",
+        conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, ""));
+
+    assertEquals("PARTITIONERKEY", conf.get("partitioner.test.key"));
+  }
+}


Mime
View raw message