tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [6/6] git commit: TEZ-1132. Consistent naming of Input and Outputs (bikas)
Date Sat, 16 Aug 2014 03:05:27 GMT
TEZ-1132. Consistent naming of Input and Outputs (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/469bf905
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/469bf905
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/469bf905

Branch: refs/heads/master
Commit: 469bf9052f7a51e67c82b4db300d71992d5fd874
Parents: 4714376
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Aug 15 20:05:26 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Aug 15 20:05:26 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/client/PreWarmVertex.java    | 166 -------
 .../java/org/apache/tez/client/TezClient.java   |   1 +
 .../tez/common/security/DAGAccessControls.java  |   2 +
 .../org/apache/tez/dag/api/PreWarmVertex.java   | 163 ++++++
 .../tez/dag/api/client/rpc/package-info.java    |  22 +
 .../org/apache/tez/client/TestTezClient.java    |   1 +
 .../tez/common/TezContainerLogAppender.java     |   2 -
 .../org/apache/tez/common/package-info.java     |  22 +
 .../apache/tez/dag/records/package-info.java    |  22 +
 .../org/apache/tez/dag/utils/package-info.java  |  22 +
 .../runtime/common/resources/package-info.java  |  22 +
 .../tez/examples/SimpleSessionExample.java      |   2 +-
 .../mapreduce/processor/map/MapProcessor.java   |   6 +-
 .../processor/reduce/ReduceProcessor.java       |  14 +-
 .../tez/mapreduce/input/LocalMergedInput.java   |  68 +++
 .../output/LocalOnFileSorterOutput.java         |  68 +++
 .../processor/map/TestMapProcessor.java         |   2 +-
 .../processor/reduce/TestReduceProcessor.java   |   4 +-
 .../readers/ShuffledUnorderedKVReader.java      | 202 --------
 .../common/readers/UnorderedKVReader.java       | 202 ++++++++
 .../conf/OnFileSortedOutputConfigurer.java      | 418 ----------------
 .../conf/OnFileUnorderedKVOutputConfigurer.java | 259 ----------
 ...eUnorderedPartitionedKVOutputConfigurer.java | 304 ------------
 .../conf/OrderedGroupedKVInputConfigurer.java   | 492 +++++++++++++++++++
 .../OrderedPartitionedKVEdgeConfigurer.java     |  34 +-
 .../OrderedPartitionedKVOutputConfigurer.java   | 418 ++++++++++++++++
 .../conf/ShuffledMergedInputConfigurer.java     | 492 -------------------
 .../ShuffledUnorderedKVInputConfigurer.java     | 334 -------------
 .../library/conf/UnorderedKVEdgeConfigurer.java | 251 ++++++++++
 .../conf/UnorderedKVInputConfigurer.java        | 334 +++++++++++++
 .../conf/UnorderedKVOutputConfigurer.java       | 259 ++++++++++
 .../UnorderedPartitionedKVEdgeConfigurer.java   |  38 +-
 .../UnorderedPartitionedKVOutputConfigurer.java | 304 ++++++++++++
 .../UnorderedUnpartitionedKVEdgeConfigurer.java | 251 ----------
 .../runtime/library/input/LocalMergedInput.java |  67 ---
 .../input/OrderedGroupedInputLegacy.java        |  78 +++
 .../library/input/OrderedGroupedKVInput.java    | 349 +++++++++++++
 .../input/OrderedGroupedMergedKVInput.java      | 245 +++++++++
 .../library/input/ShuffledMergedInput.java      | 349 -------------
 .../input/ShuffledMergedInputLegacy.java        |  78 ---
 .../library/input/ShuffledUnorderedKVInput.java | 264 ----------
 .../library/input/SortedGroupedMergedInput.java | 245 ---------
 .../runtime/library/input/UnorderedKVInput.java | 264 ++++++++++
 .../library/output/LocalOnFileSorterOutput.java |  67 ---
 .../library/output/OnFileSortedOutput.java      | 249 ----------
 .../library/output/OnFileUnorderedKVOutput.java | 193 --------
 .../OnFileUnorderedPartitionedKVOutput.java     | 133 -----
 .../output/OrderedPartitionedKVOutput.java      | 249 ++++++++++
 .../library/output/UnorderedKVOutput.java       | 193 ++++++++
 .../output/UnorderedPartitionedKVOutput.java    | 133 +++++
 .../WeightedScalingMemoryDistributor.java       |  20 +-
 .../TestWeightedScalingMemoryDistributor.java   |  18 +-
 .../conf/TestOnFileSortedOutputConfigurer.java  |  38 +-
 .../TestOnFileUnorderedKVOutputConfigurer.java  |  24 +-
 ...eUnorderedPartitionedKVOutputConfigurer.java |  36 +-
 .../TestOrderedPartitionedKVEdgeConfigurer.java |  20 +-
 .../conf/TestShuffledMergedInputConfigurer.java |  28 +-
 .../TestShuffledUnorderedKVInputConfigurer.java |  20 +-
 ...estUnorderedPartitionedKVEdgeConfigurer.java |  24 +-
 ...tUnorderedUnpartitionedKVEdgeConfigurer.java |  44 +-
 .../input/TestSortedGroupedMergedInput.java     |  16 +-
 .../library/output/TestOnFileSortedOutput.java  |   4 +-
 .../output/TestOnFileUnorderedKVOutput.java     |   4 +-
 .../examples/BroadcastAndOneToOneExample.java   |   8 +-
 .../mapreduce/examples/FilterLinesByWord.java   |   4 +-
 .../examples/FilterLinesByWordOneToOne.java     |   4 +-
 .../examples/TestOrderedWordCount.java          |   2 +-
 .../processor/FilterByWordInputProcessor.java   |   6 +-
 .../processor/FilterByWordOutputProcessor.java  |   6 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  12 +-
 71 files changed, 4404 insertions(+), 4292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b3f951..2ef940b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -66,6 +66,7 @@ INCOMPATIBLE CHANGES
   TEZ-1418. Provide Default value for TEZ_AM_LAUNCH_ENV and TEZ_TASK_LAUNCH
   TEZ-1438. Annotate add java doc for tez-runtime-library and tez-mapreduce
   TEZ-1055. Rename tez-mapreduce-examples to tez-examples
+  TEZ-1132. Consistent naming of Input and Outputs
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-api/src/main/java/org/apache/tez/client/PreWarmVertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/PreWarmVertex.java b/tez-api/src/main/java/org/apache/tez/client/PreWarmVertex.java
deleted file mode 100644
index f3db312..0000000
--- a/tez-api/src/main/java/org/apache/tez/client/PreWarmVertex.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.client;
-
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.runtime.api.Processor;
-
-/**
- * A {@link PreWarmVertex} is used to specify parameters to be used to setup
- * prewarmed containers for Tez session mode. Sessions allow re-use of execution
- * slots (containers) across DAG's. Pre- warming allows pre-allocation of
- * containers so that the first DAG has some execution resources already
- * available to re-use. In order to get re-use containers they must be setup
- * identically. So the prewarm vertex must be setup identically to the real DAG
- * vertex (typically the first vertex to execute in the read DAG). Identical
- * settings include same execution resources, same task local files etc. This
- * works best in use cases where all DAGs share the same files/jars/resource
- * settings from a common template<br>
- * The parallelism of the pre-warm vertex determines the number of containers to
- * be pre-warmed. This would ideally ensures a viable number of containers to
- * provide performance while sharing resources with other applications.
- * Typically the session would also hold onto the same number of containers
- * in-between DAGs in session mode via the
- * {@link TezConfiguration#TEZ_AM_SESSION_MIN_HELD_CONTAINERS} property. The
- * prewarm vertex by default runs the PreWarmProcessor from the Tez runtime
- * library. This processor can be overridden to get the default behavior along
- * with any app specific customizations. Alternatively, the application can
- * provide any {@link Processor} to prewarm the containers. Pre-warming
- * processors can be used to initialize classes etc. and setup the environment
- * for the actual processing to reduce latency.
- */
-@Unstable
-@Public
-public class PreWarmVertex extends Vertex {
-
-  /**
-   * Create a {@link PreWarmVertex} to be used in
-   * {@link TezClient#preWarm(PreWarmVertex)} It may be necessary to call more
-   * methods to add local files etc on the pre-warm vertex post creation so that
-   * it matches the real DAG vertices.
-   * 
-   * @param vertexName
-   *          Name of the vertex
-   * @param processorDescriptor
-   *          Descriptor of the processor to be run
-   * @param parallelism
-   *          Number of containers to be pre-warmed
-   * @param taskResource
-   *          Execution cpu/memory resources etc needed
-   */
-  public PreWarmVertex(String vertexName, ProcessorDescriptor processorDescriptor, int parallelism,
-      Resource taskResource) {
-    super(vertexName, processorDescriptor, parallelism, taskResource);
-  }
-  
-  /**
-   * Create a {@link PreWarmVertex} to be used in @link
-   * {@link TezClient#preWarm(PreWarmVertex)} This uses a built in pre-warm
-   * processor that implements common functionality. Users may derive from this
-   * processor to add custom functionality but then they must add the jar for
-   * that class to the prewarm vertex and other vertices in their DAG for which
-   * they want the containers to be reused. It may be necessary to call more
-   * methods to add local files etc on the pre-warm vertex post creation so that
-   * it matches the real DAG vertices.
-   * 
-   * @param vertexName
-   *          Name of the vertex
-   * @param parallelism
-   *          Number of containers to be pre-warmed
-   * @param taskResource
-   *          Execution cpu/memory resources etc needed
-   */
-  public PreWarmVertex(String vertexName, int parallelism, Resource taskResource) {
-    this(vertexName, new ProcessorDescriptor(
-        "org.apache.tez.runtime.library.processor.PreWarmProcessor"), parallelism, taskResource);
-  }
-  
-  /**
-   * Create a configurer for the @link {@link PreWarmVertex}. This may be used to construct the 
-   * pre-warm vertex more flexibly.
-   * @param conf
-   * @return
-   */
-  public static PreWarmVertexConfigurer createConfigurer(Configuration conf) {
-    return new PreWarmVertexConfigurer(conf);
-  }
-  
-  /**
-   * Setup the prewarm vertex constructor. By default is uses the built-in
-   * PreWarmProcessor and sets up the prewarm container number equal to
-   * {@link TezConfiguration#TEZ_AM_SESSION_MIN_HELD_CONTAINERS}
-   */
-  public static class PreWarmVertexConfigurer {
-    String name;
-    int parallelism;
-    ProcessorDescriptor proc;
-    Resource resource;
-    Configuration conf;
-    
-    PreWarmVertexConfigurer(Configuration conf) {
-      this.conf = conf;
-    }
-    
-    public PreWarmVertexConfigurer setName(String name) {
-      this.name = name;
-      return this;
-    }
-    
-    public PreWarmVertexConfigurer setProcessorDescriptor(ProcessorDescriptor proc) {
-      this.proc = proc;
-      return this;
-    }
-    
-    public PreWarmVertexConfigurer setResource(Resource resource) {
-      this.resource = resource;
-      return this;
-    }
-    
-    public PreWarmVertexConfigurer setParallelism(int parallelism) {
-      this.parallelism = parallelism;
-      return this;
-    }
-    
-    public PreWarmVertex create() {
-      if (name == null) {
-        name = "_PreWarm_";
-      }
-      if (parallelism == 0) {
-        parallelism = conf.getInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, -1);
-        if (parallelism == -1) {
-          throw new TezUncheckedException("Prewarm parallelism must be set or specified in conf via " 
-              + TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS);
-        }
-      }
-      if (proc == null) {
-        proc = new ProcessorDescriptor("org.apache.tez.runtime.library.processor.PreWarmProcessor");
-      }
-      
-      return new PreWarmVertex(name, proc, parallelism, resource);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 5e6c91c..0891694 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -43,6 +43,7 @@ import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DAGSubmissionTimedOut;
 import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java b/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
index 94e3649..1b1e90e 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/DAGAccessControls.java
@@ -25,12 +25,14 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.TezConstants;
 
 /**
  * Access controls for the DAG
  */
+@Public
 public class DAGAccessControls {
 
   private final Set<String> usersWithViewACLs;

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-api/src/main/java/org/apache/tez/dag/api/PreWarmVertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/PreWarmVertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/PreWarmVertex.java
new file mode 100644
index 0000000..69dc5d8
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/PreWarmVertex.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.runtime.api.Processor;
+
+/**
+ * A {@link PreWarmVertex} is used to specify parameters to be used to setup
+ * prewarmed containers for Tez session mode. Sessions allow re-use of execution
+ * slots (containers) across DAG's. Pre- warming allows pre-allocation of
+ * containers so that the first DAG has some execution resources already
+ * available to re-use. In order to get re-use containers they must be setup
+ * identically. So the prewarm vertex must be setup identically to the real DAG
+ * vertex (typically the first vertex to execute in the read DAG). Identical
+ * settings include same execution resources, same task local files etc. This
+ * works best in use cases where all DAGs share the same files/jars/resource
+ * settings from a common template<br>
+ * The parallelism of the pre-warm vertex determines the number of containers to
+ * be pre-warmed. This would ideally ensures a viable number of containers to
+ * provide performance while sharing resources with other applications.
+ * Typically the session would also hold onto the same number of containers
+ * in-between DAGs in session mode via the
+ * {@link TezConfiguration#TEZ_AM_SESSION_MIN_HELD_CONTAINERS} property. The
+ * prewarm vertex by default runs the PreWarmProcessor from the Tez runtime
+ * library. This processor can be overridden to get the default behavior along
+ * with any app specific customizations. Alternatively, the application can
+ * provide any {@link Processor} to prewarm the containers. Pre-warming
+ * processors can be used to initialize classes etc. and setup the environment
+ * for the actual processing to reduce latency.
+ */
+@Unstable
+@Public
+public class PreWarmVertex extends Vertex {
+
+  /**
+   * Create a {@link PreWarmVertex} to be used in
+   * {@link TezClient#preWarm(PreWarmVertex)} It may be necessary to call more
+   * methods to add local files etc on the pre-warm vertex post creation so that
+   * it matches the real DAG vertices.
+   * 
+   * @param vertexName
+   *          Name of the vertex
+   * @param processorDescriptor
+   *          Descriptor of the processor to be run
+   * @param parallelism
+   *          Number of containers to be pre-warmed
+   * @param taskResource
+   *          Execution cpu/memory resources etc needed
+   */
+  public PreWarmVertex(String vertexName, ProcessorDescriptor processorDescriptor, int parallelism,
+      Resource taskResource) {
+    super(vertexName, processorDescriptor, parallelism, taskResource);
+  }
+  
+  /**
+   * Create a {@link PreWarmVertex} to be used in @link
+   * {@link TezClient#preWarm(PreWarmVertex)} This uses a built in pre-warm
+   * processor that implements common functionality. Users may derive from this
+   * processor to add custom functionality but then they must add the jar for
+   * that class to the prewarm vertex and other vertices in their DAG for which
+   * they want the containers to be reused. It may be necessary to call more
+   * methods to add local files etc on the pre-warm vertex post creation so that
+   * it matches the real DAG vertices.
+   * 
+   * @param vertexName
+   *          Name of the vertex
+   * @param parallelism
+   *          Number of containers to be pre-warmed
+   * @param taskResource
+   *          Execution cpu/memory resources etc needed
+   */
+  public PreWarmVertex(String vertexName, int parallelism, Resource taskResource) {
+    this(vertexName, new ProcessorDescriptor(
+        "org.apache.tez.runtime.library.processor.PreWarmProcessor"), parallelism, taskResource);
+  }
+  
+  /**
+   * Create a configurer for the @link {@link PreWarmVertex}. This may be used to construct the 
+   * pre-warm vertex more flexibly.
+   * @param conf
+   * @return
+   */
+  public static PreWarmVertexConfigurer createConfigurer(Configuration conf) {
+    return new PreWarmVertexConfigurer(conf);
+  }
+  
+  /**
+   * Setup the prewarm vertex constructor. By default is uses the built-in
+   * PreWarmProcessor and sets up the prewarm container number equal to
+   * {@link TezConfiguration#TEZ_AM_SESSION_MIN_HELD_CONTAINERS}
+   */
+  public static class PreWarmVertexConfigurer {
+    String name;
+    int parallelism;
+    ProcessorDescriptor proc;
+    Resource resource;
+    Configuration conf;
+    
+    PreWarmVertexConfigurer(Configuration conf) {
+      this.conf = conf;
+    }
+    
+    public PreWarmVertexConfigurer setName(String name) {
+      this.name = name;
+      return this;
+    }
+    
+    public PreWarmVertexConfigurer setProcessorDescriptor(ProcessorDescriptor proc) {
+      this.proc = proc;
+      return this;
+    }
+    
+    public PreWarmVertexConfigurer setResource(Resource resource) {
+      this.resource = resource;
+      return this;
+    }
+    
+    public PreWarmVertexConfigurer setParallelism(int parallelism) {
+      this.parallelism = parallelism;
+      return this;
+    }
+    
+    public PreWarmVertex create() {
+      if (name == null) {
+        name = "_PreWarm_";
+      }
+      if (parallelism == 0) {
+        parallelism = conf.getInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, -1);
+        if (parallelism == -1) {
+          throw new TezUncheckedException("Prewarm parallelism must be set or specified in conf via " 
+              + TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS);
+        }
+      }
+      if (proc == null) {
+        proc = new ProcessorDescriptor("org.apache.tez.runtime.library.processor.PreWarmProcessor");
+      }
+      
+      return new PreWarmVertex(name, proc, parallelism, resource);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/package-info.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/package-info.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/package-info.java
new file mode 100644
index 0000000..3c18e02
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Private
+package org.apache.tez.dag.api.client.rpc;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 5615fda..8e3c397 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-common/src/main/java/org/apache/tez/common/TezContainerLogAppender.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezContainerLogAppender.java b/tez-common/src/main/java/org/apache/tez/common/TezContainerLogAppender.java
index cfb1e92..2cfacfb 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezContainerLogAppender.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezContainerLogAppender.java
@@ -20,7 +20,6 @@ package org.apache.tez.common;
 
 import java.io.File;
 
-import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.log4j.FileAppender;
 import org.apache.tez.dag.api.TezConstants;
@@ -29,7 +28,6 @@ import org.apache.tez.dag.api.TezConstants;
  * A simple log4j-appender for a tez container's logs.
  * 
  */
-@Public
 @Unstable
 public class TezContainerLogAppender extends FileAppender {
   private String containerLogDir;

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-common/src/main/java/org/apache/tez/common/package-info.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/package-info.java b/tez-common/src/main/java/org/apache/tez/common/package-info.java
new file mode 100644
index 0000000..88a5e86
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Private
+package org.apache.tez.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-common/src/main/java/org/apache/tez/dag/records/package-info.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/package-info.java b/tez-common/src/main/java/org/apache/tez/dag/records/package-info.java
new file mode 100644
index 0000000..319ab6b
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Private
+package org.apache.tez.dag.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-common/src/main/java/org/apache/tez/dag/utils/package-info.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/utils/package-info.java b/tez-common/src/main/java/org/apache/tez/dag/utils/package-info.java
new file mode 100644
index 0000000..0cfda84
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/utils/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Private
+package org.apache.tez.dag.utils;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-common/src/main/java/org/apache/tez/runtime/common/resources/package-info.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/runtime/common/resources/package-info.java b/tez-common/src/main/java/org/apache/tez/runtime/common/resources/package-info.java
new file mode 100644
index 0000000..49689e0
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/runtime/common/resources/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Private
+package org.apache.tez.runtime.common.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
index 1d51e9e..81addcf 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
@@ -23,9 +23,9 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.tez.client.PreWarmVertex;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index f5d920f..142e69c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -50,7 +50,7 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
 @Private
@@ -113,8 +113,8 @@ public class MapProcessor extends MRTask{
     KeyValueWriter kvWriter = null;
     if ((out instanceof MROutputLegacy)) {
       kvWriter = ((MROutputLegacy)out).getWriter();
-    } else if ((out instanceof OnFileSortedOutput)){
-      kvWriter = ((OnFileSortedOutput)out).getWriter();
+    } else if ((out instanceof OrderedPartitionedKVOutput)){
+      kvWriter = ((OrderedPartitionedKVOutput)out).getWriter();
     } else {
       throw new IOException("Illegal output to map, outputClass="
           + out.getClass());

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 3d94a58..9469236 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -51,8 +51,8 @@ import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
 
 @Private
 @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -123,17 +123,17 @@ public class ReduceProcessor extends MRTask {
         mrReporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
 
     // Sanity check
-    if (!(in instanceof ShuffledMergedInputLegacy)) {
+    if (!(in instanceof OrderedGroupedInputLegacy)) {
       throw new IOException("Illegal input to reduce: " + in.getClass());
     }
-    ShuffledMergedInputLegacy shuffleInput = (ShuffledMergedInputLegacy)in;
+    OrderedGroupedInputLegacy shuffleInput = (OrderedGroupedInputLegacy)in;
     KeyValuesReader kvReader = shuffleInput.getReader();
 
     KeyValueWriter kvWriter = null;
     if((out instanceof MROutputLegacy)) {
       kvWriter = ((MROutputLegacy) out).getWriter();
-    } else if ((out instanceof OnFileSortedOutput)) {
-      kvWriter = ((OnFileSortedOutput) out).getWriter();
+    } else if ((out instanceof OrderedPartitionedKVOutput)) {
+      kvWriter = ((OrderedPartitionedKVOutput) out).getWriter();
     } else {
       throw new IOException("Illegal output to reduce: " + in.getClass());
     }
@@ -264,7 +264,7 @@ public class ReduceProcessor extends MRTask {
 
   void runNewReducer(JobConf job,
       final MRTaskReporter reporter,
-      ShuffledMergedInputLegacy input,
+      OrderedGroupedInputLegacy input,
       RawComparator comparator,
       Class keyClass,
       Class valueClass,

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/LocalMergedInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/LocalMergedInput.java
new file mode 100644
index 0000000..da0921b
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/LocalMergedInput.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.input;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.common.localshuffle.LocalShuffle;
+import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
+
+/**
+ * <code>LocalMergedInput</code> in an {@link LogicalInput} which shuffles intermediate
+ * sorted data, merges them and provides key/<values> to the consumer. 
+ */
+public class LocalMergedInput extends OrderedGroupedInputLegacy {
+
+  public LocalMergedInput(InputContext inputContext, int numPhysicalInputs) {
+    super(inputContext, numPhysicalInputs);
+  }
+
+  @Override
+  public List<Event> initialize() throws IOException {
+    getContext().requestInitialMemory(0l, null); // mandatory call.
+    getContext().inputIsReady();
+    this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+
+    if (getNumPhysicalInputs() == 0) {
+      return Collections.emptyList();
+    }
+
+    LocalShuffle localShuffle = new LocalShuffle(getContext(), conf, getNumPhysicalInputs());
+    rawIter = localShuffle.run();
+    createValuesIterator();
+    return Collections.emptyList();
+  }
+  
+  @Override
+  public void start() throws IOException {
+  }
+
+  @Override
+  public List<Event> close() throws IOException {
+    if (getNumPhysicalInputs() != 0) {
+      rawIter.close();
+    }
+    return Collections.emptyList();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/LocalOnFileSorterOutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/LocalOnFileSorterOutput.java
new file mode 100644
index 0000000..1546614
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/LocalOnFileSorterOutput.java
@@ -0,0 +1,68 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.output;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+
+public class LocalOnFileSorterOutput extends OrderedPartitionedKVOutput {
+
+  private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
+
+  public LocalOnFileSorterOutput(OutputContext outputContext, int numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
+  }
+
+
+  @Override
+  public List<Event> close() throws IOException {
+    LOG.debug("Closing LocalOnFileSorterOutput");
+    super.close();
+
+    TezTaskOutput mapOutputFile = sorter.getMapOutput();
+    FileSystem localFs = FileSystem.getLocal(conf);
+
+    Path src = mapOutputFile.getOutputFile();
+    Path dst =
+        mapOutputFile.getInputFileForWrite(
+            getContext().getTaskIndex(),
+            localFs.getFileStatus(src).getLen());
+
+    LOG.info("Renaming src = " + src + ", dst = " + dst);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming src = " + src + ", dst = " + dst);
+    }
+    localFs.rename(src, dst);
+    return null;
+  }
+  
+  @Override
+  protected List<Event> generateEventsOnClose() throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 69702e5..a69a20f 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -41,6 +41,7 @@ import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.LocalOnFileSorterOutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.MapUtils;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
@@ -54,7 +55,6 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
-import org.apache.tez.runtime.library.output.LocalOnFileSorterOutput;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index b100ab2..a051044 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -50,7 +50,9 @@ import org.apache.tez.mapreduce.hadoop.IDConverter;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.input.LocalMergedInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.LocalOnFileSorterOutput;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.MapUtils;
@@ -63,8 +65,6 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
-import org.apache.tez.runtime.library.input.LocalMergedInput;
-import org.apache.tez.runtime.library.output.LocalOnFileSorterOutput;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.junit.After;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
deleted file mode 100644
index 0d5fb28..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.common.readers;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
-import org.apache.tez.runtime.library.common.sort.impl.IFile;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
-import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
-import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
-
-@Unstable
-@Private
-public class ShuffledUnorderedKVReader<K, V> extends KeyValueReader {
-
-  private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVReader.class);
-  
-  private final ShuffleManager shuffleManager;
-  private final CompressionCodec codec;
-  
-  private final Class<K> keyClass;
-  private final Class<V> valClass;
-  private final Deserializer<K> keyDeserializer;
-  private final Deserializer<V> valDeserializer;
-  private final DataInputBuffer keyIn;
-  private final DataInputBuffer valIn;
-
-  private final boolean ifileReadAhead;
-  private final int ifileReadAheadLength;
-  private final int ifileBufferSize;
-  
-  private final TezCounter inputRecordCounter;
-  
-  private K key;
-  private V value;
-  
-  private FetchedInput currentFetchedInput;
-  private IFile.Reader currentReader;
-  
-  // TODO Remove this once per I/O counters are separated properly. Relying on
-  // the counter at the moment will generate aggregate numbers. 
-  private int numRecordsRead = 0;
-  
-  public ShuffledUnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
-      CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
-      TezCounter inputRecordCounter)
-      throws IOException {
-    this.shuffleManager = shuffleManager;
-
-    this.codec = codec;
-    this.ifileReadAhead = ifileReadAhead;
-    this.ifileReadAheadLength = ifileReadAheadLength;
-    this.ifileBufferSize = ifileBufferSize;
-    this.inputRecordCounter = inputRecordCounter;
-
-    this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
-    this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
-
-    this.keyIn = new DataInputBuffer();
-    this.valIn = new DataInputBuffer();
-
-    SerializationFactory serializationFactory = new SerializationFactory(conf);
-
-    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
-    this.keyDeserializer.open(keyIn);
-    this.valDeserializer = serializationFactory.getDeserializer(valClass);
-    this.valDeserializer.open(valIn);
-  }
-
-  // TODO NEWTEZ Maybe add an interface to check whether next will block.
-  
-  /**
-   * Moves to the next key/values(s) pair
-   * 
-   * @return true if another key/value(s) pair exists, false if there are no
-   *         more.
-   * @throws IOException
-   *           if an error occurs
-   */
-  @Override  
-  public boolean next() throws IOException {
-    if (readNextFromCurrentReader()) {
-      inputRecordCounter.increment(1);
-      numRecordsRead++;
-      return true;
-    } else {
-      boolean nextInputExists = moveToNextInput();
-      while (nextInputExists) {
-        if(readNextFromCurrentReader()) {
-          inputRecordCounter.increment(1);
-          numRecordsRead++;
-          return true;
-        }
-        nextInputExists = moveToNextInput();
-      }
-      LOG.info("Num Records read: " + numRecordsRead);
-      return false;
-    }
-  }
-
-  @Override
-  public Object getCurrentKey() throws IOException {
-    return (Object) key;
-  }
-
-  @Override
-  public Object getCurrentValue() throws IOException {
-    return value;
-  }
-
-  /**
-   * Tries reading the next key and value from the current reader.
-   * @return true if the current reader has more records
-   * @throws IOException
-   */
-  private boolean readNextFromCurrentReader() throws IOException {
-    // Initial reader.
-    if (this.currentReader == null) {
-      return false;
-    } else {
-      boolean hasMore = this.currentReader.nextRawKey(keyIn);
-      if (hasMore) {
-        this.currentReader.nextRawValue(valIn);
-        this.key = keyDeserializer.deserialize(this.key);
-        this.value = valDeserializer.deserialize(this.value);
-        return true;
-      }
-      return false;
-    }
-  }
-  
-  /**
-   * Moves to the next available input. This method may block if the input is not ready yet.
-   * Also takes care of closing the previous input.
-   * 
-   * @return true if the next input exists, false otherwise
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private boolean moveToNextInput() throws IOException {
-    if (currentReader != null) { // Close the current reader.
-      currentReader.close();
-      currentFetchedInput.free();
-    }
-    try {
-      currentFetchedInput = shuffleManager.getNextInput();
-    } catch (InterruptedException e) {
-      LOG.warn("Interrupted while waiting for next available input", e);
-      throw new IOException(e);
-    }
-    if (currentFetchedInput == null) {
-      return false; // No more inputs
-    } else {
-      currentReader = openIFileReader(currentFetchedInput);
-      return true;
-    }
-  }
-
-  public IFile.Reader openIFileReader(FetchedInput fetchedInput)
-      throws IOException {
-    if (fetchedInput.getType() == Type.MEMORY) {
-      MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
-
-      return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
-          mfi.getBytes(), 0, (int) mfi.getActualSize());
-    } else {
-      return new IFile.Reader(fetchedInput.getInputStream(),
-          fetchedInput.getCompressedSize(), codec, null, null, ifileReadAhead,
-          ifileReadAheadLength, ifileBufferSize);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
new file mode 100644
index 0000000..a819dc7
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.readers;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
+import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+
+@Unstable
+@Private
+public class UnorderedKVReader<K, V> extends KeyValueReader {
+
+  private static final Log LOG = LogFactory.getLog(UnorderedKVReader.class);
+  
+  private final ShuffleManager shuffleManager;
+  private final CompressionCodec codec;
+  
+  private final Class<K> keyClass;
+  private final Class<V> valClass;
+  private final Deserializer<K> keyDeserializer;
+  private final Deserializer<V> valDeserializer;
+  private final DataInputBuffer keyIn;
+  private final DataInputBuffer valIn;
+
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  private final int ifileBufferSize;
+  
+  private final TezCounter inputRecordCounter;
+  
+  private K key;
+  private V value;
+  
+  private FetchedInput currentFetchedInput;
+  private IFile.Reader currentReader;
+  
+  // TODO Remove this once per I/O counters are separated properly. Relying on
+  // the counter at the moment will generate aggregate numbers. 
+  private int numRecordsRead = 0;
+  
+  public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
+      CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
+      TezCounter inputRecordCounter)
+      throws IOException {
+    this.shuffleManager = shuffleManager;
+
+    this.codec = codec;
+    this.ifileReadAhead = ifileReadAhead;
+    this.ifileReadAheadLength = ifileReadAheadLength;
+    this.ifileBufferSize = ifileBufferSize;
+    this.inputRecordCounter = inputRecordCounter;
+
+    this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+
+    this.keyIn = new DataInputBuffer();
+    this.valIn = new DataInputBuffer();
+
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyDeserializer.open(keyIn);
+    this.valDeserializer = serializationFactory.getDeserializer(valClass);
+    this.valDeserializer.open(valIn);
+  }
+
+  // TODO NEWTEZ Maybe add an interface to check whether next will block.
+  
+  /**
+   * Moves to the next key/values(s) pair
+   * 
+   * @return true if another key/value(s) pair exists, false if there are no
+   *         more.
+   * @throws IOException
+   *           if an error occurs
+   */
+  @Override  
+  public boolean next() throws IOException {
+    if (readNextFromCurrentReader()) {
+      inputRecordCounter.increment(1);
+      numRecordsRead++;
+      return true;
+    } else {
+      boolean nextInputExists = moveToNextInput();
+      while (nextInputExists) {
+        if(readNextFromCurrentReader()) {
+          inputRecordCounter.increment(1);
+          numRecordsRead++;
+          return true;
+        }
+        nextInputExists = moveToNextInput();
+      }
+      LOG.info("Num Records read: " + numRecordsRead);
+      return false;
+    }
+  }
+
+  @Override
+  public Object getCurrentKey() throws IOException {
+    return (Object) key;
+  }
+
+  @Override
+  public Object getCurrentValue() throws IOException {
+    return value;
+  }
+
+  /**
+   * Tries reading the next key and value from the current reader.
+   * @return true if the current reader has more records
+   * @throws IOException
+   */
+  private boolean readNextFromCurrentReader() throws IOException {
+    // Initial reader.
+    if (this.currentReader == null) {
+      return false;
+    } else {
+      boolean hasMore = this.currentReader.nextRawKey(keyIn);
+      if (hasMore) {
+        this.currentReader.nextRawValue(valIn);
+        this.key = keyDeserializer.deserialize(this.key);
+        this.value = valDeserializer.deserialize(this.value);
+        return true;
+      }
+      return false;
+    }
+  }
+  
+  /**
+   * Moves to the next available input. This method may block if the input is not ready yet.
+   * Also takes care of closing the previous input.
+   * 
+   * @return true if the next input exists, false otherwise
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean moveToNextInput() throws IOException {
+    if (currentReader != null) { // Close the current reader.
+      currentReader.close();
+      currentFetchedInput.free();
+    }
+    try {
+      currentFetchedInput = shuffleManager.getNextInput();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting for next available input", e);
+      throw new IOException(e);
+    }
+    if (currentFetchedInput == null) {
+      return false; // No more inputs
+    } else {
+      currentReader = openIFileReader(currentFetchedInput);
+      return true;
+    }
+  }
+
+  public IFile.Reader openIFileReader(FetchedInput fetchedInput)
+      throws IOException {
+    if (fetchedInput.getType() == Type.MEMORY) {
+      MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
+
+      return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
+          mfi.getBytes(), 0, (int) mfi.getActualSize());
+    } else {
+      return new IFile.Reader(fetchedInput.getInputStream(),
+          fetchedInput.getCompressedSize(), codec, null, null, ifileReadAhead,
+          ifileReadAheadLength, ifileBufferSize);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java
deleted file mode 100644
index 8ace60c..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Map;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
-
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-/**
- * Configure {@link org.apache.tez.runtime.library.output.OnFileSortedOutput} </p>
- *
- * Values will be picked up from tez-site if not specified, otherwise defaults from
- * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
- */
-public class OnFileSortedOutputConfigurer {
-
-  /**
-   * Configure parameters which are specific to the Output.
-   */
-  @InterfaceAudience.Private
-  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
-    /**
-     * Set the buffer size to use when sort the output
-     *
-     * @param sortBufferSize the size of the buffer in MB
-     * @return instance of the current builder
-     */
-    public T setSortBufferSize(int sortBufferSize);
-
-
-    /**
-     * Configure the combiner class
-     *
-     * @param combinerClassName the combiner class name
-     * @return instance of the current builder
-     */
-    public T setCombiner(String combinerClassName);
-
-    /**
-     * Configure the combiner class and it's associated configuration (specified as key-value
-     * pairs). This method should only be used if the combiner requires some specific configuration.
-     * {@link #setCombiner(String)} is the preferred method for setting a combiner.
-     *
-     * @param combinerClassName the combiner class name
-     * @param combinerConf      the combiner configuration. This can be null, and otherwise
-     *                          is a {@link java.util.Map} of key-value pairs. The keys should
-     *                          be limited to the ones required by the combiner.
-     * @return instance of the current builder
-     */
-    public T setCombiner(String combinerClassName, @Nullable Map<String, String> combinerConf);
-
-
-
-    /**
-     * Configure the number of threads to be used by the sorter
-     *
-     * @param numThreads the number of threads
-     * @return instance of the current builder
-     */
-    public T setSorterNumThreads(int numThreads);
-  }
-
-  @SuppressWarnings("rawtypes")
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
-      SpecificConfigurer<SpecificBuilder> {
-
-    private final E edgeBuilder;
-    private final Builder builder;
-
-    SpecificBuilder(E edgeBuilder, Builder builder) {
-      this.edgeBuilder = edgeBuilder;
-      this.builder = builder;
-    }
-
-    @Override
-    public SpecificBuilder<E> setSortBufferSize(int sortBufferSize) {
-      builder.setSortBufferSize(sortBufferSize);
-      return this;
-    }
-
-    public SpecificBuilder<E> setCombiner(String combinerClassName) {
-      return this.setCombiner(combinerClassName, null);
-    }
-
-    @Override
-    public SpecificBuilder<E> setCombiner(String combinerClassName, Map<String, String> combinerConf) {
-      builder.setCombiner(combinerClassName, combinerConf);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder<E> setSorterNumThreads(int numThreads) {
-      builder.setSorterNumThreads(numThreads);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
-      builder.setAdditionalConfiguration(key, value);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
-      builder.setAdditionalConfiguration(confMap);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setFromConfiguration(Configuration conf) {
-      builder.setFromConfiguration(conf);
-      return this;
-    }
-
-    public E done() {
-      return edgeBuilder;
-    }
-  }
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  Configuration conf;
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  OnFileSortedOutputConfigurer() {
-  }
-
-  private OnFileSortedOutputConfigurer(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * Get a UserPayload representation of the Configuration
-   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
-   */
-  public UserPayload toUserPayload() {
-    try {
-      return TezUtils.createUserPayloadFromConf(conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @InterfaceAudience.Private
-  public void fromUserPayload(UserPayload payload) {
-    try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static Builder newBuilder(String keyClass, String valueClass, String partitionerClassName) {
-    return newBuilder(keyClass, valueClass, partitionerClassName, null);
-  }
-
-  public static Builder newBuilder(String keyClass, String valueClass, String partitionerClassName,
-                                   Map<String, String> partitionerConf) {
-    return new Builder(keyClass, valueClass, partitionerClassName, partitionerConf);
-  }
-
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class Builder implements SpecificConfigurer<Builder> {
-
-    private final Configuration conf = new Configuration(false);
-
-    /**
-     * Create a configuration builder for {@link org.apache.tez.runtime.library.output.OnFileSortedOutput}
-     *
-     * @param keyClassName         the key class name
-     * @param valueClassName       the value class name
-     * @param partitionerClassName the partitioner class name
-     * @param partitionerConf      the partitioner configuration. This can be null, and is a {@link
-     *                             java.util.Map} of key-value pairs. The keys should be limited to
-     *                             the ones required by the partitioner.
-     */
-    @InterfaceAudience.Private
-    Builder(String keyClassName, String valueClassName, String partitionerClassName,
-                   @Nullable Map<String, String> partitionerConf) {
-      this();
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      Preconditions.checkNotNull(partitionerClassName, "Partitioner class name cannot be null");
-      setKeyClassName(keyClassName);
-      setValueClassName(valueClassName);
-      setPartitioner(partitionerClassName, partitionerConf);
-    }
-
-    @InterfaceAudience.Private
-    Builder() {
-      Map<String, String> tezDefaults = ConfigUtils
-          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
-              OnFileSortedOutput.getConfigurationKeySet());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
-      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
-    }
-
-    @InterfaceAudience.Private
-    Builder setKeyClassName(String keyClassName) {
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
-      return this;
-    }
-
-    @InterfaceAudience.Private
-    Builder setValueClassName(String valueClassName) {
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
-      return this;
-    }
-
-    @InterfaceAudience.Private
-    Builder setPartitioner(String partitionerClassName, @Nullable Map<String, String> partitionerConf) {
-      Preconditions.checkNotNull(partitionerClassName, "Partitioner class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, partitionerClassName);
-      if (partitionerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, partitionerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setSortBufferSize(int sortBufferSize) {
-      this.conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, sortBufferSize);
-      return this;
-    }
-
-    @Override
-    public Builder setCombiner(String combinerClassName) {
-      return this.setCombiner(combinerClassName, null);
-    }
-
-    @Override
-    public Builder setCombiner(String combinerClassName, Map<String, String> combinerConf) {
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, combinerClassName);
-      if (combinerConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, combinerConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setSorterNumThreads(int numThreads) {
-      this.conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, numThreads);
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(String key, String value) {
-      Preconditions.checkNotNull(key, "Key cannot be null");
-      if (ConfigUtils.doesKeyQualify(key,
-          Lists.newArrayList(OnFileSortedOutput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
-          TezRuntimeConfiguration.getAllowedPrefixes())) {
-        if (value == null) {
-          this.conf.unset(key);
-        } else {
-          this.conf.set(key, value);
-        }
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
-      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
-          Lists.newArrayList(OnFileSortedOutput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    @Override
-    public Builder setFromConfiguration(Configuration conf) {
-      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
-      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
-          Lists.newArrayList(OnFileSortedOutput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    /**
-     * Set the key comparator class
-     *
-     * @param comparatorClassName the key comparator class name
-     * @return instance of the current builder
-     */
-    public Builder setKeyComparatorClass(String comparatorClassName) {
-      return this.setKeyComparatorClass(comparatorClassName, null);
-    }
-
-    /**
-     * Set the key comparator class and it's associated configuration. This method should only be
-     * used if the comparator requires some specific configuration, which is typically not the
-     * case. {@link #setKeyComparatorClass(String)} is the preferred method for setting a
-     * comparator.
-     *
-     * @param comparatorClassName the key comparator class name
-     * @param comparatorConf      the comparator configuration. This can be null, and is a {@link
-     *                            java.util.Map} of key-value pairs. The keys should be limited to
-     *                            the ones required by the comparator.
-     * @return instance of the current builder
-     */
-    public Builder setKeyComparatorClass(String comparatorClassName,
-                                         @Nullable Map<String, String> comparatorConf) {
-      Preconditions.checkNotNull(comparatorClassName, "Comparator class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
-          comparatorClassName);
-      if (comparatorConf != null) {
-        // Merging the confs for now. Change to be specific in the future.
-        ConfigUtils.mergeConfsWithExclusions(this.conf, comparatorConf,
-            TezRuntimeConfiguration.getRuntimeConfigKeySet());
-      }
-      return this;
-    }
-
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
-      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
-      if (enabled && compressionCodec != null) {
-        this.conf
-            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
-      }
-      return this;
-    }
-
-    /**
-     * Set serialization class and the relevant comparator to be used for sorting.
-     * Providing custom serialization class could change the way, keys needs to be compared in
-     * sorting. Providing invalid comparator here could create invalid results.
-     *
-     * @param serializationClassName
-     * @param comparatorClassName
-     * @return
-     */
-    public Builder setKeySerializationClass(String serializationClassName,
-        String comparatorClassName) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      Preconditions.checkArgument(comparatorClassName != null,
-          "comparator cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      setKeyComparatorClass(comparatorClassName, null);
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for values.
-     *
-     * @param serializationClassName
-     * @return
-     */
-    public Builder setValueSerializationClass(String serializationClassName) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      return this;
-    }
-
-    /**
-     * Create the actual configuration instance.
-     *
-     * @return an instance of the Configuration
-     */
-    public OnFileSortedOutputConfigurer build() {
-      return new OnFileSortedOutputConfigurer(this.conf);
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/tez/blob/469bf905/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java
deleted file mode 100644
index 42634eb..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Map;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-/**
- * Configure {@link org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput} </p>
- *
- * Values will be picked up from tez-site if not specified, otherwise defaults from
- * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration} will be used.
- */
-public class OnFileUnorderedKVOutputConfigurer {
-  /**
-   * Configure parameters which are specific to the Output.
-   */
-  @InterfaceAudience.Private
-  public static interface SpecificConfigurer<T> extends BaseConfigurer<T> {
-  }
-
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
-      SpecificConfigurer<SpecificBuilder> {
-
-    private final E edgeBuilder;
-    private final Builder builder;
-
-    SpecificBuilder(E edgeBuilder, Builder builder) {
-      this.edgeBuilder = edgeBuilder;
-      this.builder = builder;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(String key, String value) {
-      builder.setAdditionalConfiguration(key, value);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setAdditionalConfiguration(Map<String, String> confMap) {
-      builder.setAdditionalConfiguration(confMap);
-      return this;
-    }
-
-    @Override
-    public SpecificBuilder setFromConfiguration(Configuration conf) {
-      builder.setFromConfiguration(conf);
-      return this;
-    }
-
-    public E done() {
-      return edgeBuilder;
-    }
-  }
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  Configuration conf;
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  OnFileUnorderedKVOutputConfigurer() {
-  }
-
-  private OnFileUnorderedKVOutputConfigurer(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * Get a UserPayload representation of the Configuration
-   * @return a {@link org.apache.tez.dag.api.UserPayload} instance
-   */
-  public UserPayload toUserPayload() {
-    try {
-      return TezUtils.createUserPayloadFromConf(conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @InterfaceAudience.Private
-  public void fromUserPayload(UserPayload payload) {
-    try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static Builder newBuilder(String keyClass, String valClass) {
-    return new Builder(keyClass, valClass);
-  }
-
-  @InterfaceAudience.Public
-  @InterfaceStability.Evolving
-  public static class Builder implements SpecificConfigurer<Builder> {
-
-    private final Configuration conf = new Configuration(false);
-
-    /**
-     * Create a configuration builder for {@link org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput}
-     *
-     * @param keyClassName         the key class name
-     * @param valueClassName       the value class name
-     */
-    @InterfaceAudience.Private
-    Builder(String keyClassName, String valueClassName) {
-      this();
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      setKeyClassName(keyClassName);
-      setValueClassName(valueClassName);
-    }
-
-    @InterfaceAudience.Private
-    Builder() {
-      Map<String, String> tezDefaults = ConfigUtils
-          .extractConfigurationMap(TezRuntimeConfiguration.getTezRuntimeConfigDefaults(),
-              OnFileUnorderedKVOutput.getConfigurationKeySet());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, tezDefaults);
-      ConfigUtils.addConfigMapToConfiguration(this.conf, TezRuntimeConfiguration.getOtherConfigDefaults());
-    }
-
-    @InterfaceAudience.Private
-    Builder setKeyClassName(String keyClassName) {
-      Preconditions.checkNotNull(keyClassName, "Key class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClassName);
-      return this;
-    }
-
-    @InterfaceAudience.Private
-    Builder setValueClassName(String valueClassName) {
-      Preconditions.checkNotNull(valueClassName, "Value class name cannot be null");
-      this.conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClassName);
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(String key, String value) {
-      Preconditions.checkNotNull(key, "Key cannot be null");
-      if (ConfigUtils.doesKeyQualify(key,
-          Lists.newArrayList(OnFileUnorderedKVOutput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()),
-          TezRuntimeConfiguration.getAllowedPrefixes())) {
-        if (value == null) {
-          this.conf.unset(key);
-        } else {
-          this.conf.set(key, value);
-        }
-      }
-      return this;
-    }
-
-    @Override
-    public Builder setAdditionalConfiguration(Map<String, String> confMap) {
-      Preconditions.checkNotNull(confMap, "ConfMap cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(confMap,
-          Lists.newArrayList(OnFileUnorderedKVOutput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    @Override
-    public Builder setFromConfiguration(Configuration conf) {
-      // Maybe ensure this is the first call ? Otherwise this can end up overriding other parameters
-      Preconditions.checkArgument(conf != null, "Configuration cannot be null");
-      Map<String, String> map = ConfigUtils.extractConfigurationMap(conf,
-          Lists.newArrayList(OnFileUnorderedKVOutput.getConfigurationKeySet(),
-              TezRuntimeConfiguration.getRuntimeAdditionalConfigKeySet()), TezRuntimeConfiguration.getAllowedPrefixes());
-      ConfigUtils.addConfigMapToConfiguration(this.conf, map);
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for keys.
-     *
-     * @param serializationClassName
-     * @return
-     */
-    public Builder setKeySerializationClass(String serializationClassName) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      return this;
-    }
-
-    /**
-     * Set serialization class responsible for providing serializer/deserializer for values.
-     *
-     * @param serializationClassName
-     * @return
-     */
-    public Builder setValueSerializationClass(String serializationClassName) {
-      Preconditions.checkArgument(serializationClassName != null,
-          "serializationClassName cannot be null");
-      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
-          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      return this;
-    }
-
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
-      this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
-      if (enabled && compressionCodec != null) {
-        this.conf
-            .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
-      }
-      return this;
-    }
-
-    /**
-     * Create the actual configuration instance.
-     *
-     * @return an instance of the Configuration
-     */
-    public OnFileUnorderedKVOutputConfigurer build() {
-      return new OnFileUnorderedKVOutputConfigurer(this.conf);
-    }
-  }
-}


Mime
View raw message