tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [4/6] TEZ-1132. Consistent naming of Input and Outputs (bikas)
Date Sat, 16 Aug 2014 03:05:25 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
new file mode 100644
index 0000000..1e0e56c
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
@@ -0,0 +1,251 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.library.input.UnorderedKVInput;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+
+/**
+ * Configure payloads for the UnorderedKVOutput and UnorderedKVInput pair </p>
+ *
+ * Values will be picked up from tez-site if not specified, otherwise defaults from
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class UnorderedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfigurer {
+  private final UnorderedKVOutputConfigurer outputConf;
+  private final UnorderedKVInputConfigurer inputConf;
+
+  private UnorderedKVEdgeConfigurer(
+      UnorderedKVOutputConfigurer outputConfiguration,
+      UnorderedKVInputConfigurer inputConfiguration) {
+    this.outputConf = outputConfiguration;
+    this.inputConf = inputConfiguration;
+
+  }
+
+  /**
+   * Create a builder to configure the relevant Input and Output
+   * @param keyClassName the key class name
+   * @param valueClassName the value class name
+   * @return a builder to configure the edge
+   */
+  public static Builder newBuilder(String keyClassName, String valueClassName) {
+    return new Builder(keyClassName, valueClassName);
+  }
+
+  @Override
+  public UserPayload getOutputPayload() {
+    return outputConf.toUserPayload();
+  }
+
+  @Override
+  public String getOutputClassName() {
+    return UnorderedKVOutput.class.getName();
+  }
+
+  @Override
+  public UserPayload getInputPayload() {
+    return inputConf.toUserPayload();
+  }
+
+  @Override
+  public String getInputClassName() {
+    return UnorderedKVInput.class.getName();
+  }
+
+  /**
+   * This is a convenience method for the typical usage of this edge, and creates an instance of
+   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
+   * If custom edge properties are required, the methods to get the relevant payloads should be
+   * used. </p>
+   * * In this case - DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED,
+   * EdgeProperty.SchedulingType.SEQUENTIAL
+   *
+   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
+   */
+  public EdgeProperty createDefaultBroadcastEdgeProperty() {
+    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.BROADCAST,
+        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
+        new OutputDescriptor(
+            getOutputClassName()).setUserPayload(getOutputPayload()),
+        new InputDescriptor(
+            getInputClassName()).setUserPayload(getInputPayload()));
+    return edgeProperty;
+  }
+
+  /**
+   * This is a convenience method for the typical usage of this edge, and creates an instance of
+   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
+   * If custom edge properties are required, the methods to get the relevant payloads should be
+   * used. </p>
+   * * In this case - DataMovementType.ONE_TO_ONE, EdgeProperty.DataSourceType.PERSISTED,
+   * EdgeProperty.SchedulingType.SEQUENTIAL
+   *
+   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
+   */
+  public EdgeProperty createDefaultOneToOneEdgeProperty() {
+    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.ONE_TO_ONE,
+        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
+        new OutputDescriptor(
+            getOutputClassName()).setUserPayload(getOutputPayload()),
+        new InputDescriptor(
+            getInputClassName()).setUserPayload(getInputPayload()));
+    return edgeProperty;
+  }
+
+  /**
+   * This is a convenience method for creating an Edge descriptor based on the specified
+   * EdgeManagerDescriptor.
+   *
+   * @param edgeManagerDescriptor the custom edge specification
+   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
+   */
+  public EdgeProperty createDefaultCustomEdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor) {
+    Preconditions.checkNotNull(edgeManagerDescriptor, "EdgeManagerDescriptor cannot be null");
+    EdgeProperty edgeProperty =
+        new EdgeProperty(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
+            EdgeProperty.SchedulingType.SEQUENTIAL,
+            new OutputDescriptor(getOutputClassName()).setUserPayload(getOutputPayload()),
+            new InputDescriptor(getInputClassName()).setUserPayload(getInputPayload()));
+    return edgeProperty;
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder<Builder> {
+
+    private final UnorderedKVOutputConfigurer.Builder outputBuilder =
+        new UnorderedKVOutputConfigurer.Builder();
+    private final UnorderedKVOutputConfigurer.SpecificBuilder<UnorderedKVEdgeConfigurer.Builder>
+        specificOutputBuilder =
+        new UnorderedKVOutputConfigurer.SpecificBuilder<UnorderedKVEdgeConfigurer.Builder>(
+            this, outputBuilder);
+
+    private final UnorderedKVInputConfigurer.Builder inputBuilder =
+        new UnorderedKVInputConfigurer.Builder();
+    private final UnorderedKVInputConfigurer.SpecificBuilder<UnorderedKVEdgeConfigurer.Builder>
+        specificInputBuilder =
+        new UnorderedKVInputConfigurer.SpecificBuilder<UnorderedKVEdgeConfigurer.Builder>(
+            this, inputBuilder);
+
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName) {
+      outputBuilder.setKeyClassName(keyClassName);
+      outputBuilder.setValueClassName(valueClassName);
+      inputBuilder.setKeyClassName(keyClassName);
+      inputBuilder.setValueClassName(valueClassName);
+    }
+
+    @Override
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+      outputBuilder.setCompression(enabled, compressionCodec);
+      inputBuilder.setCompression(enabled, compressionCodec);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      outputBuilder.setAdditionalConfiguration(key, value);
+      inputBuilder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      outputBuilder.setAdditionalConfiguration(confMap);
+      inputBuilder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      outputBuilder.setFromConfiguration(conf);
+      inputBuilder.setFromConfiguration(conf);
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for key/value and
+     * the corresponding comparator class to be used as key comparator.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName) {
+      outputBuilder.setKeySerializationClass(serializationClassName);
+      inputBuilder.setKeySerializationClass(serializationClassName);
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      outputBuilder.setValueSerializationClass(serializationClassName);
+      inputBuilder.setValueSerializationClass(serializationClassName);
+      return this;
+    }
+
+    /**
+     * Configure the specific output
+     * @return a builder to configure the output
+     */
+    public UnorderedKVOutputConfigurer.SpecificBuilder<Builder> configureOutput() {
+      return specificOutputBuilder;
+    }
+
+    /**
+     * Configure the specific input
+     * @return a builder to configure the input
+     */
+    public UnorderedKVInputConfigurer.SpecificBuilder<Builder> configureInput() {
+      return specificInputBuilder;
+    }
+
+    /**
+     * Build and return an instance of the configuration
+     * @return an instance of the acatual configuration
+     */
+    public UnorderedKVEdgeConfigurer build() {
+      return new UnorderedKVEdgeConfigurer(outputBuilder.build(), inputBuilder.build());
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
new file mode 100644
index 0000000..2203928
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
@@ -0,0 +1,334 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.input.UnorderedKVInput;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+/**
+ * Configure {@link org.apache.tez.runtime.library.input.UnorderedKVInput} </p>
+ *
+ * Values will be picked up from tez-site if not specified, otherwise defaults from
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
+ */
+public class UnorderedKVInputConfigurer {
+
+  /**
+   * Configure parameters which are specific to the Input.
+   */
+  @InterfaceAudience.Private
+  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
+
+    /**
+     * Sets the buffer fraction, as a fraction of container size, to be used while fetching remote
+     * data.
+     *
+     * @param shuffleBufferFraction fraction of container size
+     * @return instance of the current builder
+     */
+    public T setShuffleBufferFraction(float shuffleBufferFraction);
+
+    /**
+     * Sets a size limit on the maximum segment size to be shuffled to disk. This is a fraction of
+     * the shuffle buffer.
+     *
+     * @param maxSingleSegmentFraction fraction of memory determined by ShuffleBufferFraction
+     * @return instance of the current builder
+     */
+    public T setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction);
+
+    /**
+     * Configure the point at which in memory segments will be merged and written out to a single
+     * large disk segment. This is specified as a
+     * fraction of the shuffle buffer. </p> Has no affect at the moment.
+     *
+     * @param mergeFraction fraction of memory determined by ShuffleBufferFraction, which when
+     *                      filled, will
+     *                      trigger a merge
+     * @return instance of the current builder
+     */
+    public T setMergeFraction(float mergeFraction);
+
+  }
+
+  @SuppressWarnings("rawtypes")
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
+      SpecificConfigurer<SpecificBuilder> {
+
+    private final E edgeBuilder;
+    private final UnorderedKVInputConfigurer.Builder builder;
+
+
+    @InterfaceAudience.Private
+    SpecificBuilder(E edgeBuilder, UnorderedKVInputConfigurer.Builder builder) {
+      this.edgeBuilder = edgeBuilder;
+      this.builder = builder;
+    }
+
+    @Override
+    public SpecificBuilder<E> setShuffleBufferFraction(float shuffleBufferFraction) {
+      builder.setShuffleBufferFraction(shuffleBufferFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
+      builder.setMaxSingleMemorySegmentFraction(maxSingleSegmentFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder<E> setMergeFraction(float mergeFraction) {
+      builder.setMergeFraction(mergeFraction);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
+      builder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
+      builder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setFromConfiguration(Configuration conf) {
+      builder.setFromConfiguration(conf);
+      return this;
+    }
+
+    public E done() {
+      return edgeBuilder;
+    }
+
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  Configuration conf;
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  UnorderedKVInputConfigurer() {
+  }
+
+  private UnorderedKVInputConfigurer(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get a UserPayload representation of the Configuration
+   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
+   */
+  public UserPayload toUserPayload() {
+    try {
+      return TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public void fromUserPayload(UserPayload payload) {
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(payload);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Builder newBuilder(String keyClass, String valueClass) {
+    return new Builder(keyClass, valueClass);
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder implements SpecificConfigurer<Builder> {
+
+    private final Configuration conf = new Configuration(false);
+
+    /**
+     * Create a configuration builder for {@link org.apache.tez.runtime.library.input.UnorderedKVInput}
+     *
+     * @param keyClassName         the key class name
+     * @param valueClassName       the value class name
+     */
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName) {
+      this();
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      setKeyClassName(keyClassName);
+      setValueClassName(valueClassName);
+    }
+
+    @InterfaceAudience.Private
+    Builder() {
+      Map<String, String> tezDefaults = ConfigUtils
+          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
+              UnorderedKVInput.getConfigurationKeySet());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
+      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
+    }
+
+    @InterfaceAudience.Private
+    Builder setKeyClassName(String keyClassName) {
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setValueClassName(String valueClassName) {
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
+      return this;
+    }
+
+    @Override
+    public Builder setShuffleBufferFraction(float shuffleBufferFraction) {
+      this.conf
+          .setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, shuffleBufferFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setMaxSingleMemorySegmentFraction(float maxSingleSegmentFraction) {
+      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+          maxSingleSegmentFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setMergeFraction(float mergeFraction) {
+      this.conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, mergeFraction);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      Preconditions.checkNotNull(key, "Key cannot be null");
+      if (ConfigUtils.doesKeyQualify(key,
+          Lists.newArrayList(UnorderedKVInput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
+          TezRuntimeConfiguration.getAllowedPrefixes())) {
+        if (value == null) {
+          this.conf.unset(key);
+        } else {
+          this.conf.set(key, value);
+        }
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
+          Lists.newArrayList(UnorderedKVInput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
+      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
+          Lists.newArrayList(UnorderedKVInput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
+      if (enabled && compressionCodec != null) {
+        this.conf
+            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
+      }
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for key/value and
+     * the corresponding comparator class to be used as key comparator.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
+     * Create the actual configuration instance.
+     *
+     * @return an instance of the Configuration
+     */
+    public UnorderedKVInputConfigurer build() {
+      return new UnorderedKVInputConfigurer(this.conf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java
new file mode 100644
index 0000000..986ee45
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java
@@ -0,0 +1,259 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+/**
+ * Configure {@link org.apache.tez.runtime.library.output.UnorderedKVOutput} </p>
+ *
+ * Values will be picked up from tez-site if not specified, otherwise defaults from
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
+ */
+public class UnorderedKVOutputConfigurer {
+  /**
+   * Configure parameters which are specific to the Output.
+   */
+  @InterfaceAudience.Private
+  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
+      SpecificConfigurer<SpecificBuilder> {
+
+    private final E edgeBuilder;
+    private final Builder builder;
+
+    SpecificBuilder(E edgeBuilder, Builder builder) {
+      this.edgeBuilder = edgeBuilder;
+      this.builder = builder;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
+      builder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
+      builder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setFromConfiguration(Configuration conf) {
+      builder.setFromConfiguration(conf);
+      return this;
+    }
+
+    public E done() {
+      return edgeBuilder;
+    }
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  Configuration conf;
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  UnorderedKVOutputConfigurer() {
+  }
+
+  private UnorderedKVOutputConfigurer(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get a UserPayload representation of the Configuration
+   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
+   */
+  public UserPayload toUserPayload() {
+    try {
+      return TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public void fromUserPayload(UserPayload payload) {
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(payload);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Builder newBuilder(String keyClass, String valClass) {
+    return new Builder(keyClass, valClass);
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder implements SpecificConfigurer<Builder> {
+
+    private final Configuration conf = new Configuration(false);
+
+    /**
+     * Create a configuration builder for {@link org.apache.tez.runtime.library.output.UnorderedKVOutput}
+     *
+     * @param keyClassName         the key class name
+     * @param valueClassName       the value class name
+     */
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName) {
+      this();
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      setKeyClassName(keyClassName);
+      setValueClassName(valueClassName);
+    }
+
+    @InterfaceAudience.Private
+    Builder() {
+      Map<String, String> tezDefaults = ConfigUtils
+          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
+              UnorderedKVOutput.getConfigurationKeySet());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
+      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
+    }
+
+    @InterfaceAudience.Private
+    Builder setKeyClassName(String keyClassName) {
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setValueClassName(String valueClassName) {
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      Preconditions.checkNotNull(key, "Key cannot be null");
+      if (ConfigUtils.doesKeyQualify(key,
+          Lists.newArrayList(UnorderedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
+          TezRuntimeConfiguration.getAllowedPrefixes())) {
+        if (value == null) {
+          this.conf.unset(key);
+        } else {
+          this.conf.set(key, value);
+        }
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
+          Lists.newArrayList(UnorderedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
+      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
+          Lists.newArrayList(UnorderedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for keys.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
+      if (enabled && compressionCodec != null) {
+        this.conf
+            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
+      }
+      return this;
+    }
+
+    /**
+     * Create the actual configuration instance.
+     *
+     * @return an instance of the Configuration
+     */
+    public UnorderedKVOutputConfigurer build() {
+      return new UnorderedKVOutputConfigurer(this.conf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
index ef93d9e..30e141c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
@@ -34,11 +34,11 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.input.UnorderedKVInput;
+import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput;
 
 /**
- * Configure payloads for the OnFileUnorderedPartitionedKVOutput and ShuffledUnorderedKVInput pair </p>
+ * Configure payloads for the UnorderedPartitionedKVOutput and UnorderedKVInput pair </p>
  *
  * Values will be picked up from tez-site if not specified, otherwise defaults from
  * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
@@ -47,12 +47,12 @@ import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
 @InterfaceStability.Evolving
 public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfigurer {
 
-  private final OnFileUnorderedPartitionedKVOutputConfigurer outputConf;
-  private final ShuffledUnorderedKVInputConfigurer inputConf;
+  private final UnorderedPartitionedKVOutputConfigurer outputConf;
+  private final UnorderedKVInputConfigurer inputConf;
 
   private UnorderedPartitionedKVEdgeConfigurer(
-      OnFileUnorderedPartitionedKVOutputConfigurer outputConfiguration,
-      ShuffledUnorderedKVInputConfigurer inputConfiguration) {
+      UnorderedPartitionedKVOutputConfigurer outputConfiguration,
+      UnorderedKVInputConfigurer inputConfiguration) {
     this.outputConf = outputConfiguration;
     this.inputConf = inputConfiguration;
 
@@ -98,7 +98,7 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
 
   @Override
   public String getOutputClassName() {
-    return OnFileUnorderedPartitionedKVOutput.class.getName();
+    return UnorderedPartitionedKVOutput.class.getName();
   }
 
   @Override
@@ -108,7 +108,7 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
 
   @Override
   public String getInputClassName() {
-    return ShuffledUnorderedKVInput.class.getName();
+    return UnorderedKVInput.class.getName();
   }
 
   /**
@@ -152,18 +152,18 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
   @InterfaceStability.Evolving
   public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder<Builder> {
 
-    private final OnFileUnorderedPartitionedKVOutputConfigurer.Builder outputBuilder =
-        new OnFileUnorderedPartitionedKVOutputConfigurer.Builder();
-    private final OnFileUnorderedPartitionedKVOutputConfigurer.SpecificBuilder<UnorderedPartitionedKVEdgeConfigurer.Builder>
+    private final UnorderedPartitionedKVOutputConfigurer.Builder outputBuilder =
+        new UnorderedPartitionedKVOutputConfigurer.Builder();
+    private final UnorderedPartitionedKVOutputConfigurer.SpecificBuilder<UnorderedPartitionedKVEdgeConfigurer.Builder>
         specificOutputBuilder =
-        new OnFileUnorderedPartitionedKVOutputConfigurer.SpecificBuilder<UnorderedPartitionedKVEdgeConfigurer.Builder>(
+        new UnorderedPartitionedKVOutputConfigurer.SpecificBuilder<UnorderedPartitionedKVEdgeConfigurer.Builder>(
             this, outputBuilder);
 
-    private final ShuffledUnorderedKVInputConfigurer.Builder inputBuilder =
-        new ShuffledUnorderedKVInputConfigurer.Builder();
-    private final ShuffledUnorderedKVInputConfigurer.SpecificBuilder<UnorderedPartitionedKVEdgeConfigurer.Builder>
+    private final UnorderedKVInputConfigurer.Builder inputBuilder =
+        new UnorderedKVInputConfigurer.Builder();
+    private final UnorderedKVInputConfigurer.SpecificBuilder<UnorderedPartitionedKVEdgeConfigurer.Builder>
         specificInputBuilder =
-        new ShuffledUnorderedKVInputConfigurer.SpecificBuilder<UnorderedPartitionedKVEdgeConfigurer.Builder>(
+        new UnorderedKVInputConfigurer.SpecificBuilder<UnorderedPartitionedKVEdgeConfigurer.Builder>(
             this, inputBuilder);
 
     @InterfaceAudience.Private
@@ -233,7 +233,7 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
      *
      * @return a builder to configure the output
      */
-    public OnFileUnorderedPartitionedKVOutputConfigurer.SpecificBuilder<Builder> configureOutput() {
+    public UnorderedPartitionedKVOutputConfigurer.SpecificBuilder<Builder> configureOutput() {
       return specificOutputBuilder;
     }
 
@@ -241,7 +241,7 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
      * Configure the specific input
      * @return a builder to configure the input
      */
-    public ShuffledUnorderedKVInputConfigurer.SpecificBuilder<Builder> configureInput() {
+    public UnorderedKVInputConfigurer.SpecificBuilder<Builder> configureInput() {
       return specificInputBuilder;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java
new file mode 100644
index 0000000..8033c95
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java
@@ -0,0 +1,304 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+/**
+ * Configure {@link org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput} </p>
+ *
+ * Values will be picked up from tez-site if not specified, otherwise defaults from
+ * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
+ */
+public class UnorderedPartitionedKVOutputConfigurer {
+  /**
+   * Configure parameters which are specific to the Output.
+   */
+  @InterfaceAudience.Private
+  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
+    /**
+     * Set the buffer size to use
+     *
+     * @param availableBufferSize the size of the buffer in MB
+     * @return instance of the current builder
+     */
+    public T setAvailableBufferSize(int availableBufferSize);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
+      SpecificConfigurer<SpecificBuilder> {
+
+    private final E edgeBuilder;
+    private final Builder builder;
+
+    SpecificBuilder(E edgeBuilder, Builder builder) {
+      this.edgeBuilder = edgeBuilder;
+      this.builder = builder;
+    }
+
+    @Override
+    public SpecificBuilder<E> setAvailableBufferSize(int availableBufferSize) {
+      builder.setAvailableBufferSize(availableBufferSize);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
+      builder.setAdditionalConfiguration(key, value);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
+      builder.setAdditionalConfiguration(confMap);
+      return this;
+    }
+
+    @Override
+    public SpecificBuilder setFromConfiguration(Configuration conf) {
+      builder.setFromConfiguration(conf);
+      return this;
+    }
+
+    public E done() {
+      return edgeBuilder;
+    }
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  Configuration conf;
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  UnorderedPartitionedKVOutputConfigurer() {
+  }
+
+  private UnorderedPartitionedKVOutputConfigurer(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get a UserPayload representation of the Configuration
+   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
+   */
+  public UserPayload toUserPayload() {
+    try {
+      return TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @InterfaceAudience.Private
+  public void fromUserPayload(UserPayload payload) {
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(payload);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  public static Builder newBuilder(String keyClass, String valClass, String partitionerClassName) {
+    return newBuilder(keyClass, valClass, partitionerClassName, null);
+  }
+
+  public static Builder newBuilder(String keyClass, String valClass, String partitionerClassName,
+                                   Map<String, String> partitionerConf) {
+    return new Builder(keyClass, valClass, partitionerClassName, partitionerConf);
+  }
+
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public static class Builder implements SpecificConfigurer<Builder> {
+
+    private final Configuration conf = new Configuration(false);
+
+    /**
+     * Create a configuration builder for {@link org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput}
+     *
+     * @param keyClassName         the key class name
+     * @param valueClassName       the value class name
+     * @param partitionerClassName the partitioner class name
+     * @param partitionerConf      configuration for the partitioner specified as a map of key-value
+     *                             pairs. This can be null
+     */
+    @InterfaceAudience.Private
+    Builder(String keyClassName, String valueClassName, String partitionerClassName,
+                   Map<String, String> partitionerConf) {
+      this();
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      Preconditions.checkNotNull(partitionerClassName, "Partitioner class name cannot be null");
+      setKeyClassName(keyClassName);
+      setValueClassName(valueClassName);
+      setPartitioner(partitionerClassName, partitionerConf);
+    }
+
+    @InterfaceAudience.Private
+    Builder() {
+      Map<String, String> tezDefaults = ConfigUtils
+          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
+              UnorderedPartitionedKVOutput.getConfigurationKeySet());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
+      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
+    }
+
+    @InterfaceAudience.Private
+    Builder setKeyClassName(String keyClassName) {
+      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setValueClassName(String valueClassName) {
+      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
+      return this;
+    }
+
+    @InterfaceAudience.Private
+    Builder setPartitioner(String partitionerClassName, Map<String, String> partitionerConf) {
+      Preconditions.checkNotNull(partitionerClassName, "Partitioner class name cannot be null");
+      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, partitionerClassName);
+      if (partitionerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, partitionerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setAvailableBufferSize(int availableBufferSize) {
+      this.conf
+          .setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, availableBufferSize);
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(String key, String value) {
+      Preconditions.checkNotNull(key, "Key cannot be null");
+      if (ConfigUtils.doesKeyQualify(key,
+          Lists.newArrayList(UnorderedPartitionedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
+          TezRuntimeConfiguration.getAllowedPrefixes())) {
+        if (value == null) {
+          this.conf.unset(key);
+        } else {
+          this.conf.set(key, value);
+        }
+      }
+      return this;
+    }
+
+    @Override
+    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
+      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
+          Lists.newArrayList(UnorderedPartitionedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    @Override
+    public Builder setFromConfiguration(Configuration conf) {
+      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
+      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
+      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
+          Lists.newArrayList(UnorderedPartitionedKVOutput.getConfigurationKeySet(),
+              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
+      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
+      return this;
+    }
+
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
+      if (enabled && compressionCodec != null) {
+        this.conf
+            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
+      }
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for keys.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
+     * Create the actual configuration instance.
+     *
+     * @return an instance of the Configuration
+     */
+    public UnorderedPartitionedKVOutputConfigurer build() {
+      return new UnorderedPartitionedKVOutputConfigurer(this.conf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
deleted file mode 100644
index c10e57a..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import javax.annotation.Nullable;
-
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
-
-/**
- * Configure payloads for the OnFileUnorderedKVOutput and ShuffledUnorderedKVInput pair </p>
- *
- * Values will be picked up from tez-site if not specified, otherwise defaults from
- * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class UnorderedUnpartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfigurer {
-  private final OnFileUnorderedKVOutputConfigurer outputConf;
-  private final ShuffledUnorderedKVInputConfigurer inputConf;
-
-  private UnorderedUnpartitionedKVEdgeConfigurer(
-      OnFileUnorderedKVOutputConfigurer outputConfiguration,
-      ShuffledUnorderedKVInputConfigurer inputConfiguration) {
-    this.outputConf = outputConfiguration;
-    this.inputConf = inputConfiguration;
-
-  }
-
-  /**
-   * Create a builder to configure the relevant Input and Output
-   * @param keyClassName the key class name
-   * @param valueClassName the value class name
-   * @return a builder to configure the edge
-   */
-  public static Builder newBuilder(String keyClassName, String valueClassName) {
-    return new Builder(keyClassName, valueClassName);
-  }
-
-  @Override
-  public UserPayload getOutputPayload() {
-    return outputConf.toUserPayload();
-  }
-
-  @Override
-  public String getOutputClassName() {
-    return OnFileUnorderedKVOutput.class.getName();
-  }
-
-  @Override
-  public UserPayload getInputPayload() {
-    return inputConf.toUserPayload();
-  }
-
-  @Override
-  public String getInputClassName() {
-    return ShuffledUnorderedKVInput.class.getName();
-  }
-
-  /**
-   * This is a convenience method for the typical usage of this edge, and creates an instance of
-   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
-   * If custom edge properties are required, the methods to get the relevant payloads should be
-   * used. </p>
-   * * In this case - DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED,
-   * EdgeProperty.SchedulingType.SEQUENTIAL
-   *
-   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
-   */
-  public EdgeProperty createDefaultBroadcastEdgeProperty() {
-    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.BROADCAST,
-        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
-        new OutputDescriptor(
-            getOutputClassName()).setUserPayload(getOutputPayload()),
-        new InputDescriptor(
-            getInputClassName()).setUserPayload(getInputPayload()));
-    return edgeProperty;
-  }
-
-  /**
-   * This is a convenience method for the typical usage of this edge, and creates an instance of
-   * {@link org.apache.tez.dag.api.EdgeProperty} which is likely to be used. </p>
-   * If custom edge properties are required, the methods to get the relevant payloads should be
-   * used. </p>
-   * * In this case - DataMovementType.ONE_TO_ONE, EdgeProperty.DataSourceType.PERSISTED,
-   * EdgeProperty.SchedulingType.SEQUENTIAL
-   *
-   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
-   */
-  public EdgeProperty createDefaultOneToOneEdgeProperty() {
-    EdgeProperty edgeProperty = new EdgeProperty(EdgeProperty.DataMovementType.ONE_TO_ONE,
-        EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL,
-        new OutputDescriptor(
-            getOutputClassName()).setUserPayload(getOutputPayload()),
-        new InputDescriptor(
-            getInputClassName()).setUserPayload(getInputPayload()));
-    return edgeProperty;
-  }
-
-  /**
-   * This is a convenience method for creating an Edge descriptor based on the specified
-   * EdgeManagerDescriptor.
-   *
-   * @param edgeManagerDescriptor the custom edge specification
-   * @return an {@link org.apache.tez.dag.api.EdgeProperty} instance
-   */
-  public EdgeProperty createDefaultCustomEdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor) {
-    Preconditions.checkNotNull(edgeManagerDescriptor, "EdgeManagerDescriptor cannot be null");
-    EdgeProperty edgeProperty =
-        new EdgeProperty(edgeManagerDescriptor, EdgeProperty.DataSourceType.PERSISTED,
-            EdgeProperty.SchedulingType.SEQUENTIAL,
-            new OutputDescriptor(getOutputClassName()).setUserPayload(getOutputPayload()),
-            new InputDescriptor(getInputClassName()).setUserPayload(getInputPayload()));
-    return edgeProperty;
-  }
-
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder<Builder> {
-
-    private final OnFileUnorderedKVOutputConfigurer.Builder outputBuilder =
-        new OnFileUnorderedKVOutputConfigurer.Builder();
-    private final OnFileUnorderedKVOutputConfigurer.SpecificBuilder<UnorderedUnpartitionedKVEdgeConfigurer.Builder>
-        specificOutputBuilder =
-        new OnFileUnorderedKVOutputConfigurer.SpecificBuilder<UnorderedUnpartitionedKVEdgeConfigurer.Builder>(
-            this, outputBuilder);
-
-    private final ShuffledUnorderedKVInputConfigurer.Builder inputBuilder =
-        new ShuffledUnorderedKVInputConfigurer.Builder();
-    private final ShuffledUnorderedKVInputConfigurer.SpecificBuilder<UnorderedUnpartitionedKVEdgeConfigurer.Builder>
-        specificInputBuilder =
-        new ShuffledUnorderedKVInputConfigurer.SpecificBuilder<UnorderedUnpartitionedKVEdgeConfigurer.Builder>(
-            this, inputBuilder);
-
-    @InterfaceAudience.Private
-    Builder(String keyClassName, String valueClassName) {
-      outputBuilder.setKeyClassName(keyClassName);
-      outputBuilder.setValueClassName(valueClassName);
-      inputBuilder.setKeyClassName(keyClassName);
-      inputBuilder.setValueClassName(valueClassName);
-    }
-
-    @Override
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
-      outputBuilder.setCompression(enabled, compressionCodec);
-      inputBuilder.setCompression(enabled, compressionCodec);
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(String key, String value) {
-      outputBuilder.setAdditionalConfiguration(key, value);
-      inputBuilder.setAdditionalConfiguration(key, value);
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
-      outputBuilder.setAdditionalConfiguration(confMap);
-      inputBuilder.setAdditionalConfiguration(confMap);
-      return this;
-    }
-
-    @Override
-    public Builder setFromConfiguration(Configuration conf) {
-      outputBuilder.setFromConfiguration(conf);
-      inputBuilder.setFromConfiguration(conf);
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for key/value and
-     * the corresponding comparator class to be used as key comparator.
-     *
-     * @param serializationClassName
-     * @return
-     */
-    public Builder setKeySerializationClass(String serializationClassName) {
-      outputBuilder.setKeySerializationClass(serializationClassName);
-      inputBuilder.setKeySerializationClass(serializationClassName);
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for values.
-     *
-     * @param serializationClassName
-     * @return
-     */
-    public Builder setValueSerializationClass(String serializationClassName) {
-      outputBuilder.setValueSerializationClass(serializationClassName);
-      inputBuilder.setValueSerializationClass(serializationClassName);
-      return this;
-    }
-
-    /**
-     * Configure the specific output
-     * @return a builder to configure the output
-     */
-    public OnFileUnorderedKVOutputConfigurer.SpecificBuilder<Builder> configureOutput() {
-      return specificOutputBuilder;
-    }
-
-    /**
-     * Configure the specific input
-     * @return a builder to configure the input
-     */
-    public ShuffledUnorderedKVInputConfigurer.SpecificBuilder<Builder> configureInput() {
-      return specificInputBuilder;
-    }
-
-    /**
-     * Build and return an instance of the configuration
-     * @return an instance of the acatual configuration
-     */
-    public UnorderedUnpartitionedKVEdgeConfigurer build() {
-      return new UnorderedUnpartitionedKVEdgeConfigurer(outputBuilder.build(), inputBuilder.build());
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
deleted file mode 100644
index 65938a8..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.input;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.InputContext;
-import org.apache.tez.runtime.library.common.localshuffle.LocalShuffle;
-
-/**
- * <code>LocalMergedInput</code> in an {@link LogicalInput} which shuffles intermediate
- * sorted data, merges them and provides key/<values> to the consumer. 
- */
-public class LocalMergedInput extends ShuffledMergedInputLegacy {
-
-  public LocalMergedInput(InputContext inputContext, int numPhysicalInputs) {
-    super(inputContext, numPhysicalInputs);
-  }
-
-  @Override
-  public List<Event> initialize() throws IOException {
-    getContext().requestInitialMemory(0l, null); // mandatory call.
-    getContext().inputIsReady();
-    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
-
-    if (getNumPhysicalInputs() == 0) {
-      return Collections.emptyList();
-    }
-
-    LocalShuffle localShuffle = new LocalShuffle(getContext(), conf, getNumPhysicalInputs());
-    rawIter = localShuffle.run();
-    createValuesIterator();
-    return Collections.emptyList();
-  }
-  
-  @Override
-  public void start() throws IOException {
-  }
-
-  @Override
-  public List<Event> close() throws IOException {
-    if (getNumPhysicalInputs() != 0) {
-      rawIter.close();
-    }
-    return Collections.emptyList();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
new file mode 100644
index 0000000..e2960f8
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedInputLegacy.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+@Private
+public class OrderedGroupedInputLegacy extends OrderedGroupedKVInput {
+
+  private final Progress progress = new Progress();
+
+  public OrderedGroupedInputLegacy(InputContext inputContext, int numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
+  @Private
+  public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException, TezException {
+    // wait for input so that iterator is available
+    synchronized(this) {
+    if (getNumPhysicalInputs() == 0) {
+      return new TezRawKeyValueIterator() {
+        @Override
+        public DataInputBuffer getKey() throws IOException {
+          throw new RuntimeException("No data available in Input");
+        }
+
+        @Override
+        public DataInputBuffer getValue() throws IOException {
+          throw new RuntimeException("No data available in Input");
+        }
+
+        @Override
+        public boolean next() throws IOException {
+          return false;
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public Progress getProgress() {
+          progress.complete();
+          return progress;
+        }
+      };
+    }
+    }
+
+    waitForInputReady();
+    synchronized(this) {
+      return rawIter;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
new file mode 100644
index 0000000..c1982ee
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.ValuesIterator;
+import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * {@link OrderedGroupedKVInput} in a {@link AbstractLogicalInput} which shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the
+ * consumer. This is typically used to bring one partition of a set of partitioned
+ * distributed data to one consumer. The shuffle operation brings all partitions 
+ * to one place. These partitions are assumed to be sorted and are merged sorted to 
+ * merge them into a single input view.
+ *
+ * The Copy and Merge will be triggered by the initialization - which is handled
+ * by the Tez framework. Input is not consumable until the Copy and Merge are
+ * complete. Methods are provided to check for this, as well as to wait for
+ * completion. Attempting to get a reader on a non-complete input will block.
+ *
+ */
+public class OrderedGroupedKVInput extends AbstractLogicalInput {
+
+  static final Log LOG = LogFactory.getLog(OrderedGroupedKVInput.class);
+
+  protected TezRawKeyValueIterator rawIter = null;
+  protected Configuration conf;
+  protected Shuffle shuffle;
+  protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+  private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
+  private long firstEventReceivedTime = -1;
+  @SuppressWarnings("rawtypes")
+  protected ValuesIterator vIter;
+
+  private TezCounter inputKeyCounter;
+  private TezCounter inputValueCounter;
+
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+  public OrderedGroupedKVInput(InputContext inputContext, int numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
+  @Override
+  public synchronized List<Event> initialize() throws IOException {
+    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+
+    if (this.getNumPhysicalInputs() == 0) {
+      getContext().requestInitialMemory(0l, null);
+      isStarted.set(true);
+      getContext().inputIsReady();
+      LOG.info("input fetch not required since there are 0 physical inputs for input vertex: "
+          + getContext().getSourceVertexName());
+      return Collections.emptyList();
+    }
+
+    long initialMemoryRequest = Shuffle.getInitialMemoryRequirement(conf,
+        getContext().getTotalMemoryAvailableToTask());
+    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+    getContext().requestInitialMemory(initialMemoryRequest, memoryUpdateCallbackHandler);
+
+    this.inputKeyCounter = getContext().getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+    this.inputValueCounter = getContext().getCounters().findCounter(
+        TaskCounter.REDUCE_INPUT_RECORDS);
+    this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs());
+
+    return Collections.emptyList();
+  }
+
+  @Override
+  public synchronized void start() throws IOException {
+    if (!isStarted.get()) {
+      memoryUpdateCallbackHandler.validateUpdateReceived();
+      // Start the shuffle - copy and merge
+      shuffle = new Shuffle(getContext(), conf, getNumPhysicalInputs(), memoryUpdateCallbackHandler.getMemoryAssigned());
+      shuffle.run();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Initialized the handlers in shuffle..Safe to start processing..");
+      }
+      List<Event> pending = new LinkedList<Event>();
+      pendingEvents.drainTo(pending);
+      if (pending.size() > 0) {
+        LOG.info("NoAutoStart delay in processing first event: "
+            + (System.currentTimeMillis() - firstEventReceivedTime));
+        shuffle.handleEvents(pending);
+      }
+      isStarted.set(true);
+    }
+  }
+
+  /**
+   * Check if the input is ready for consumption
+   *
+   * @return true if the input is ready for consumption, or if an error occurred
+   *         processing fetching the input. false if the shuffle and merge are
+   *         still in progress
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public synchronized boolean isInputReady() throws IOException, InterruptedException, TezException {
+    Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
+    if (getNumPhysicalInputs() == 0) {
+      return true;
+    }
+    return shuffle.isInputReady();
+  }
+
+  /**
+   * Waits for the input to become ready for consumption
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void waitForInputReady() throws IOException, InterruptedException, TezException {
+    // Cannot synchronize entire method since this is called form user code and can block.
+    Shuffle localShuffleCopy = null;
+    synchronized (this) {
+      Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
+      if (getNumPhysicalInputs() == 0) {
+        return;
+      }
+      localShuffleCopy = shuffle;
+    }
+
+    TezRawKeyValueIterator localRawIter = localShuffleCopy.waitForInput();
+    synchronized(this) {
+      rawIter = localRawIter;
+      createValuesIterator();
+    }
+  }
+
+  @Override
+  public synchronized List<Event> close() throws IOException {
+    if (this.getNumPhysicalInputs() != 0 && rawIter != null) {
+      rawIter.close();
+    }
+    if (shuffle != null) {
+      shuffle.shutdown();
+    }
+    return Collections.emptyList();
+  }
+
+  /**
+   * Get a KVReader for the Input.</p> This method will block until the input is
+   * ready - i.e. the copy and merge stages are complete. Users can use the
+   * isInputReady method to check if the input is ready, which gives an
+   * indication of whether this method will block or not.
+   *
+   * NOTE: All values for the current K-V pair must be read prior to invoking
+   * moveToNext. Once moveToNext() is called, the valueIterator from the
+   * previous K-V pair will throw an Exception
+   *
+   * @return a KVReader over the sorted input.
+   */
+  @Override
+  public KeyValuesReader getReader() throws IOException, TezException {
+    // Cannot synchronize entire method since this is called form user code and can block.
+    TezRawKeyValueIterator rawIterLocal;
+    synchronized (this) {
+      rawIterLocal = rawIter;
+      if (getNumPhysicalInputs() == 0) {
+        return new KeyValuesReader() {
+          @Override
+          public boolean next() throws IOException {
+            return false;
+          }
+
+          @Override
+          public Object getCurrentKey() throws IOException {
+            throw new RuntimeException("No data available in Input");
+          }
+
+          @Override
+          public Iterable<Object> getCurrentValues() throws IOException {
+            throw new RuntimeException("No data available in Input");
+          }
+        };
+      }
+    }
+    if (rawIterLocal == null) {
+      try {
+        waitForInputReady();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Interrupted while waiting for input ready", e);
+      }
+    }
+    @SuppressWarnings("rawtypes")
+    ValuesIterator valuesIter = null;
+    synchronized(this) {
+      valuesIter = vIter;
+    }
+    return new OrderedGroupedKeyValuesReader(valuesIter);
+  }
+
+  @Override
+  public void handleEvents(List<Event> inputEvents) throws IOException {
+    synchronized (this) {
+      if (getNumPhysicalInputs() == 0) {
+        throw new RuntimeException("No input events expected as numInputs is 0");
+      }
+      if (!isStarted.get()) {
+        if (firstEventReceivedTime == -1) {
+          firstEventReceivedTime = System.currentTimeMillis();
+        }
+        pendingEvents.addAll(inputEvents);
+        return;
+      }
+    }
+    shuffle.handleEvents(inputEvents);
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  protected synchronized void createValuesIterator()
+      throws IOException {
+    // Not used by ReduceProcessor
+    vIter = new ValuesIterator(rawIter,
+        (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf),
+        ConfigUtils.getIntermediateInputKeyClass(conf),
+        ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
+
+  }
+
+  @SuppressWarnings("rawtypes")
+  public RawComparator getInputKeyComparator() {
+    return (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf);
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static class OrderedGroupedKeyValuesReader extends KeyValuesReader {
+
+    private final ValuesIterator valuesIter;
+
+    OrderedGroupedKeyValuesReader(ValuesIterator valuesIter) {
+      this.valuesIter = valuesIter;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      return valuesIter.moveToNext();
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return valuesIter.getKey();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Iterable<Object> getCurrentValues() throws IOException {
+      return valuesIter.getValues();
+    }
+  };
+
+
+  private static final Set<String> confKeys = new HashSet<String>();
+
+  static {
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
+    confKeys.add(TezConfiguration.TEZ_AM_COUNTERS_MAX_KEYS);
+    confKeys.add(TezConfiguration.TEZ_AM_COUNTERS_GROUP_NAME_MAX_KEYS);
+    confKeys.add(TezConfiguration.TEZ_AM_COUNTERS_NAME_MAX_KEYS);
+    confKeys.add(TezConfiguration.TEZ_AM_COUNTERS_GROUPS_MAX_KEYS);
+  }
+
+  // TODO Maybe add helper methods to extract keys
+  // TODO Maybe add constants or an Enum to access the keys
+
+  @InterfaceAudience.Private
+  public static Set<String> getConfigurationKeySet() {
+    return Collections.unmodifiableSet(confKeys);
+  }
+
+}


Mime
View raw message