tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-698. Make it easy to create and configure MRInput/MROutput and other inputs/outputs (bikas)
Date Tue, 22 Apr 2014 20:12:17 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 0a681b34e -> 465dea1cc


TEZ-698. Make it easy to create and configure MRInput/MROutput and other inputs/outputs (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/465dea1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/465dea1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/465dea1c

Branch: refs/heads/master
Commit: 465dea1cca9b6030bd6cbe23826288ae21b38497
Parents: 0a681b3
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Apr 22 13:12:07 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Apr 22 13:12:07 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/client/DAGClient.java    |   1 -
 .../dag/api/client/rpc/DAGClientRPCImpl.java    |   1 -
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   1 -
 .../tez/mapreduce/examples/UnionExample.java    | 159 +++++++------------
 .../tez/mapreduce/examples/WordCount.java       |  87 +++-------
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |  53 +++++++
 .../org/apache/tez/mapreduce/input/MRInput.java |  48 ++++++
 .../apache/tez/mapreduce/output/MROutput.java   |  21 +++
 8 files changed, 205 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/465dea1c/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index 32f2712..74a5b77 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -20,7 +20,6 @@ package org.apache.tez.dag.api.client;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.EnumSet;
 import java.util.Set;
 import javax.annotation.Nullable;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/465dea1c/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 711ffb6..907e632 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -26,7 +26,6 @@ import java.util.HashSet;
 import java.util.Set;
 import javax.annotation.Nullable;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.ipc.RPC;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/465dea1c/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 68b019e..eba41fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1537,7 +1537,6 @@ public class DAGAppMaster extends AbstractService {
     return null;
   }
 
-  @SuppressWarnings("unchecked")
   @Override
   public synchronized void serviceStart() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/465dea1c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index d73bbe1..91eb297 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -19,7 +19,6 @@ package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
@@ -32,13 +31,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -68,16 +65,12 @@ import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
@@ -90,15 +83,12 @@ import com.google.common.collect.Maps;
 
 public class UnionExample {
 
-  public static class TokenProcessor implements LogicalIOProcessor {
-    TezProcessorContext context;
+  public static class TokenProcessor extends AbstractLogicalIOProcessor {
     IntWritable one = new IntWritable(1);
     Text word = new Text();
 
     @Override
-    public void initialize(TezProcessorContext processorContext)
-        throws Exception {
-      this.context = processorContext;
+    public void initialize() throws Exception {
     }
 
     @Override
@@ -120,7 +110,7 @@ public class UnionExample {
         output.start();
       }
       boolean inUnion = true;
-      if (context.getTaskVertexName().equals("map3")) {
+      if (getContext().getTaskVertexName().equals("map3")) {
         inUnion = false;
       }
       Preconditions.checkArgument(outputs.size() == (inUnion ? 2 : 1));
@@ -147,7 +137,7 @@ public class UnionExample {
       }
       if (inUnion) {
         if (parts.isCommitRequired()) {
-          while (!context.canCommit()) {
+          while (!getContext().canCommit()) {
             Thread.sleep(100);
           }
           parts.commit();
@@ -157,14 +147,11 @@ public class UnionExample {
     
   }
   
-  public static class UnionProcessor implements LogicalIOProcessor {
-    TezProcessorContext context;
+  public static class UnionProcessor extends AbstractLogicalIOProcessor {
     IntWritable one = new IntWritable(1);
     
     @Override
-    public void initialize(TezProcessorContext processorContext)
-        throws Exception {
-      this.context = processorContext;
+    public void initialize() throws Exception {
     }
 
     @Override
@@ -225,13 +212,13 @@ public class UnionExample {
       }
       kvWriter.write("Union", new IntWritable(unionKv.size()));
       if (out.isCommitRequired()) {
-        while (!context.canCommit()) {
+        while (!getContext().canCommit()) {
           Thread.sleep(100);
         }
         out.commit();
       }
       if (allParts.isCommitRequired()) {
-        while (!context.canCommit()) {
+        while (!getContext().canCommit()) {
           Thread.sleep(100);
         }
         allParts.commit();
@@ -243,100 +230,70 @@ public class UnionExample {
   private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
       Map<String, LocalResource> localResources, Path stagingDir,
       String inputPath, String outputPath) throws IOException {
-    Configuration mapStageConf = new JobConf((Configuration)tezConf);
-    mapStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        Text.class.getName());
-    mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        IntWritable.class.getName());
-    mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, 
-        TezGroupedSplitsInputFormat.class.getName());
-
-    mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
-    mapStageConf.setBoolean("mapred.mapper.new-api", true);
-
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf,
-        null);
-
-    Configuration finalReduceConf = new JobConf((Configuration)tezConf);
-    finalReduceConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        Text.class.getName());
-    finalReduceConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        IntWritable.class.getName());
-    finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
-        TextOutputFormat.class.getName());
-    finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
-    finalReduceConf.setBoolean("mapred.mapper.new-api", true);
-
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
-        mapStageConf);
-
-    MRHelpers.doJobClientMagic(mapStageConf);
-    MRHelpers.doJobClientMagic(finalReduceConf);
-
-    byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
-    byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload, 
-            TextInputFormat.class.getName());
+    DAG dag = new DAG("UnionExample");
+    
     int numMaps = -1;
+    Configuration inputConf = new Configuration(tezConf);
+    inputConf.set(FileInputFormat.INPUT_DIR, inputPath);
+    InputDescriptor id = new InputDescriptor(MRInput.class.getName())
+        .setUserPayload(MRInput.createUserPayload(inputConf,
+            TextInputFormat.class.getName(), true, true));
+
     Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
         TokenProcessor.class.getName()),
-        numMaps, MRHelpers.getMapResource(mapStageConf));
-    mapVertex1.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
-    Map<String, String> mapEnv = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
-    mapVertex1.setTaskEnvironment(mapEnv);
-    Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
-    InputDescriptor id = new InputDescriptor(MRInput.class.getName()).
-        setUserPayload(mapInputPayload);
-    mapVertex1.addInput("MRInput", id, initializerClazz);
+        numMaps, MRHelpers.getMapResource(tezConf));
+    mapVertex1.setJavaOpts(MRHelpers.getMapJavaOpts(tezConf));
+    mapVertex1.addInput("MRInput", id, MRInputAMSplitGenerator.class);
 
     Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
         TokenProcessor.class.getName()),
-        numMaps, MRHelpers.getMapResource(mapStageConf));
-    mapVertex2.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
-    MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
-    mapVertex2.setTaskEnvironment(mapEnv);
-    mapVertex2.addInput("MRInput", id, initializerClazz);
+        numMaps, MRHelpers.getMapResource(tezConf));
+    mapVertex2.setJavaOpts(MRHelpers.getMapJavaOpts(tezConf));
+    mapVertex2.addInput("MRInput", id, MRInputAMSplitGenerator.class);
 
     Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
         TokenProcessor.class.getName()),
-        numMaps, MRHelpers.getMapResource(mapStageConf));
-    mapVertex3.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
-    MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
-    mapVertex3.setTaskEnvironment(mapEnv);
-    mapVertex3.addInput("MRInput", id, initializerClazz);
-    
-    byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+        numMaps, MRHelpers.getMapResource(tezConf));
+    mapVertex3.setJavaOpts(MRHelpers.getMapJavaOpts(tezConf));
+    mapVertex3.addInput("MRInput", id, MRInputAMSplitGenerator.class);
+
     Vertex checkerVertex = new Vertex("checker",
         new ProcessorDescriptor(
-            UnionProcessor.class.getName()).setUserPayload(finalReducePayload),
-                1, MRHelpers.getReduceResource(finalReduceConf));
+            UnionProcessor.class.getName()),
+                1, MRHelpers.getReduceResource(tezConf));
     checkerVertex.setJavaOpts(
-        MRHelpers.getReduceJavaOpts(finalReduceConf));
-    Map<String, String> reduceEnv = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
-    checkerVertex.setTaskEnvironment(reduceEnv);
+        MRHelpers.getReduceJavaOpts(tezConf));
+
+    Configuration outputConf = new Configuration(tezConf);
+    outputConf.set(FileOutputFormat.OUTDIR, outputPath);
     OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
-      .setUserPayload(finalReducePayload);
+      .setUserPayload(MROutput.createUserPayload(
+          outputConf, TextOutputFormat.class.getName(), true));
     checkerVertex.addOutput("union", od, MROutputCommitter.class);
 
-    Configuration partsConf = new Configuration(finalReduceConf);
+    Configuration allPartsConf = new Configuration(tezConf);
+    allPartsConf.set(FileOutputFormat.OUTDIR, outputPath+"-all-parts");
+    OutputDescriptor od2 = new OutputDescriptor(MROutput.class.getName())
+      .setUserPayload(MROutput.createUserPayload(
+          allPartsConf, TextOutputFormat.class.getName(), true));
+    checkerVertex.addOutput("all-parts", od2, MROutputCommitter.class);
+
+    Configuration partsConf = new Configuration(tezConf);
     partsConf.set(FileOutputFormat.OUTDIR, outputPath+"-parts");
-    byte[] partsPayload = MRHelpers.createUserPayloadFromConf(partsConf);
-    
-    DAG dag = new DAG("UnionExample");
     
     VertexGroup unionVertex = dag.createVertexGroup("union", mapVertex1, mapVertex2);
     OutputDescriptor od1 = new OutputDescriptor(MROutput.class.getName())
-      .setUserPayload(partsPayload);
-    Configuration allPartsConf = new Configuration(finalReduceConf);
-    allPartsConf.set(FileOutputFormat.OUTDIR, outputPath+"-all-parts");
-    byte[] allPartsPayload = MRHelpers.createUserPayloadFromConf(allPartsConf);
-    OutputDescriptor od2 = new OutputDescriptor(MROutput.class.getName())
-      .setUserPayload(allPartsPayload);
+      .setUserPayload(MROutput.createUserPayload(
+          partsConf, TextOutputFormat.class.getName(), true));
     unionVertex.addOutput("parts", od1, MROutputCommitter.class);
-    checkerVertex.addOutput("all-parts", od2, MROutputCommitter.class);
-    
     
+    byte[] intermediateDataPayloadIn = 
+        MRHelpers.createMRIntermediateDataPayload(tezConf, Text.class.getName(), 
+            IntWritable.class.getName(), true, null, null);
+    byte[] intermediateDataPayloadOut = 
+        MRHelpers.createMRIntermediateDataPayload(tezConf, Text.class.getName(), 
+            IntWritable.class.getName(), true, null, null);
+
     dag.addVertex(mapVertex1)
         .addVertex(mapVertex2)
         .addVertex(mapVertex3)
@@ -346,17 +303,17 @@ public class UnionExample {
                 DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
                 SchedulingType.SEQUENTIAL, 
                 new OutputDescriptor(OnFileSortedOutput.class.getName())
-                        .setUserPayload(mapPayload), 
+                        .setUserPayload(intermediateDataPayloadIn), 
                 new InputDescriptor(ShuffledMergedInput.class.getName())
-                        .setUserPayload(finalReducePayload))))
+                        .setUserPayload(intermediateDataPayloadOut))))
         .addEdge(
             new GroupInputEdge(unionVertex, checkerVertex, new EdgeProperty(
                 DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
                 SchedulingType.SEQUENTIAL,
                 new OutputDescriptor(OnFileSortedOutput.class.getName())
-                    .setUserPayload(mapPayload), 
+                    .setUserPayload(intermediateDataPayloadIn), 
                 new InputDescriptor(ShuffledMergedInput.class.getName())
-                    .setUserPayload(finalReducePayload)),
+                    .setUserPayload(intermediateDataPayloadOut)),
                 new InputDescriptor(
                     ConcatenatedMergedKeyValuesInput.class.getName())));
     return dag;  
@@ -395,7 +352,7 @@ public class UnionExample {
     // security
     TokenCache.obtainTokensForNamenodes(credentials, new Path[] {stagingDir}, tezConf);
     TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
-
+ 
     tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
         MRHelpers.getMRAMJavaOpts(tezConf));
 
@@ -446,7 +403,7 @@ public class UnionExample {
   }
 
   public static void main(String[] args) throws Exception {
-    if ((args.length%2) != 0) {
+    if (args.length != 2) {
       printUsage();
       System.exit(2);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/465dea1c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 7dade9b..0e8aaba 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -18,7 +18,6 @@
 package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
@@ -30,13 +29,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
@@ -63,15 +60,12 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
@@ -178,64 +172,33 @@ public class WordCount {
   private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
       Map<String, LocalResource> localResources, Path stagingDir,
       String inputPath, String outputPath) throws IOException {
-    Configuration mapStageConf = new JobConf((Configuration)tezConf);
-    mapStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        Text.class.getName());
-    mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        IntWritable.class.getName());
-    mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, 
-        TezGroupedSplitsInputFormat.class.getName());
-
-    mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
-    mapStageConf.setBoolean("mapred.mapper.new-api", true);
-
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf,
-        null);
-
-    Configuration finalReduceConf = new JobConf((Configuration)tezConf);
-    finalReduceConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        Text.class.getName());
-    finalReduceConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        IntWritable.class.getName());
-    finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
-        TextOutputFormat.class.getName());
-    finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
-    finalReduceConf.setBoolean("mapred.mapper.new-api", false);
-
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
-        mapStageConf);
-
-    MRHelpers.doJobClientMagic(mapStageConf);
-    MRHelpers.doJobClientMagic(finalReduceConf);
-
-    byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
-    byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload, 
-            TextInputFormat.class.getName());
-    int numMaps = -1;
+
+    Configuration inputConf = new Configuration(tezConf);
+    inputConf.set(FileInputFormat.INPUT_DIR, inputPath);
+    InputDescriptor id = new InputDescriptor(MRInput.class.getName())
+        .setUserPayload(MRInput.createUserPayload(inputConf,
+            TextInputFormat.class.getName(), true, true));
+
+    Configuration outputConf = new Configuration(tezConf);
+    outputConf.set(FileOutputFormat.OUTDIR, outputPath);
+    OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
+      .setUserPayload(MROutput.createUserPayload(
+          outputConf, TextOutputFormat.class.getName(), true));
+    
+    byte[] intermediateDataPayload = 
+        MRHelpers.createMRIntermediateDataPayload(tezConf, Text.class.getName(), 
+            IntWritable.class.getName(), true, null, null);
+    
     Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor(
-        TokenProcessor.class.getName()),
-        numMaps, MRHelpers.getMapResource(mapStageConf));
-    tokenizerVertex.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
-    Map<String, String> mapEnv = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
-    tokenizerVertex.setTaskEnvironment(mapEnv);
-    Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
-    InputDescriptor id = new InputDescriptor(MRInput.class.getName()).
-        setUserPayload(mapInputPayload);
-    tokenizerVertex.addInput("MRInput", id, initializerClazz);
-
-    byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+        TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
+    tokenizerVertex.setJavaOpts(MRHelpers.getMapJavaOpts(tezConf));
+    tokenizerVertex.addInput("MRInput", id, MRInputAMSplitGenerator.class);
+
     Vertex summerVertex = new Vertex("summer",
         new ProcessorDescriptor(
-            SumProcessor.class.getName()).setUserPayload(finalReducePayload),
-                1, MRHelpers.getReduceResource(finalReduceConf));
+            SumProcessor.class.getName()), 1, MRHelpers.getReduceResource(tezConf));
     summerVertex.setJavaOpts(
-        MRHelpers.getReduceJavaOpts(finalReduceConf));
-    Map<String, String> reduceEnv = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
-    summerVertex.setTaskEnvironment(reduceEnv);
-    OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
-        .setUserPayload(finalReducePayload);
+        MRHelpers.getReduceJavaOpts(tezConf));
     summerVertex.addOutput("MROutput", od, MROutputCommitter.class);
     
     DAG dag = new DAG("WordCount");
@@ -246,9 +209,9 @@ public class WordCount {
                 DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
                 SchedulingType.SEQUENTIAL, 
                 new OutputDescriptor(OnFileSortedOutput.class.getName())
-                        .setUserPayload(mapPayload), 
+                        .setUserPayload(intermediateDataPayload), 
                 new InputDescriptor(ShuffledMergedInput.class.getName())
-                        .setUserPayload(finalReducePayload))));
+                        .setUserPayload(intermediateDataPayload))));
     return dag;  
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/465dea1c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index b4adbc4..9ad352d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 
+import javax.annotation.Nullable;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -146,6 +148,57 @@ public class MRHelpers {
     }
   }
 
+  /**
+   * Create the user payload to be set on intermediate edge Input/Output classes
+   * that use MapReduce Key-Value data types. If the input and output have
+   * different configurations then this method may be called separately for both
+   * to get different payloads. If the input and output have no special
+   * configuration then this method may be called once to get the common payload
+   * for both input and output.
+   * 
+   * @param conf
+   *          Configuration for the class
+   * @param keyClassName
+   *          Class name of the Key
+   * @param valueClassName
+   *          Class name of the Value
+   * @param useNewApi
+   *          use new mapreduce API or old mapred API
+   * @param keyComparatorClassName
+   *          Optional key comparator class name
+   * @param compressionCodecClassName
+   *          Optional compression codec
+   * @return
+   * @throws IOException
+   */
+  public static byte[] createMRIntermediateDataPayload(Configuration conf,
+      String keyClassName, String valueClassName, boolean useNewApi,
+      @Nullable String keyComparatorClassName,
+      @Nullable String compressionCodecClassName) throws IOException {
+    Preconditions.checkNotNull(conf);
+    Preconditions.checkNotNull(keyClassName);
+    Preconditions.checkNotNull(valueClassName);
+    Configuration intermediateDataConf = new JobConf(conf);
+    intermediateDataConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, keyClassName);
+    intermediateDataConf
+        .set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, valueClassName);
+    if (keyComparatorClassName != null) {
+      intermediateDataConf.set(MRJobConfig.KEY_COMPARATOR,
+          keyComparatorClassName);
+    }
+    if (compressionCodecClassName != null) {
+      intermediateDataConf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+      intermediateDataConf.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
+          compressionCodecClassName);
+    }
+    intermediateDataConf.setBoolean("mapred.mapper.new-api", useNewApi);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(
+        intermediateDataConf, intermediateDataConf);
+    MRHelpers.doJobClientMagic(intermediateDataConf);
+
+    return TezUtils.createUserPayloadFromConf(intermediateDataConf);
+  }
+
   @SuppressWarnings({ "rawtypes", "unchecked" })
   @Private
   public static org.apache.hadoop.mapreduce.InputSplit[] generateNewSplits(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/465dea1c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 9ba552e..0eebfb4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -46,12 +46,15 @@ import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
 import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
@@ -111,6 +114,51 @@ public class MRInput extends AbstractLogicalInput {
   @Private
   volatile boolean splitInfoViaEvents;
   
+  /**
+   * Helper API to generate the user payload for the MRInput and
+   * MRInputAMSplitGenerator (if used). The InputFormat will be invoked by Tez
+   * at DAG runtime to generate the input splits.
+   * 
+   * @param conf
+   *          Configuration for the InputFormat
+   * @param inputFormatClassName
+   *          Name of the class of the InputFormat
+   * @param useNewApi
+   *          use new mapreduce API or old mapred API
+   * @param groupSplitsInAM
+   *          do grouping of splits in the AM. If true then splits generated by
+   *          the InputFormat will be grouped in the AM based on available
+   *          resources, locality etc. This option may be set to true only when
+   *          using MRInputAMSplitGenerator as the initializer class in
+   *          {@link Vertex#addInput(String, org.apache.tez.dag.api.InputDescriptor, Class)}
+   * @return returns the user payload to be set on the InputDescriptor of  MRInput
+   * @throws IOException
+   */
+  public static byte[] createUserPayload(Configuration conf,
+      String inputFormatClassName, boolean useNewApi, boolean groupSplitsInAM)
+      throws IOException {
+    Configuration inputConf = new JobConf(conf);
+    String wrappedInputFormatClassName = null;
+    String configInputFormatClassName = null;
+    if (groupSplitsInAM) {
+      wrappedInputFormatClassName = inputFormatClassName;
+      configInputFormatClassName = TezGroupedSplitsInputFormat.class.getName();
+    } else {
+      wrappedInputFormatClassName = null;
+      configInputFormatClassName = inputFormatClassName;
+    }
+    inputConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+        configInputFormatClassName);
+    inputConf.setBoolean("mapred.mapper.new-api", useNewApi);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(inputConf, null);
+    MRHelpers.doJobClientMagic(inputConf);
+    if (groupSplitsInAM) {
+      return MRHelpers.createMRInputPayloadWithGrouping(inputConf,
+          wrappedInputFormatClassName);
+    } else {
+      return MRHelpers.createMRInputPayload(inputConf, null);
+    }
+  }
   
   @Override
   public List<Event> initialize() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/465dea1c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 8aeee26..170f5c8 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -42,7 +42,9 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
 import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
@@ -79,6 +81,25 @@ public class MROutput extends AbstractLogicalOutput {
   private boolean isMapperOutput;
 
   protected OutputCommitter committer;
+  
+  /**
+   * Creates the user payload to be set on the OutputDescriptor for MROutput
+   * @param conf Configuration for the OutputFormat
+   * @param outputFormatName Name of the class of the OutputFormat
+   * @param useNewApi Use new mapreduce API or old mapred API
+   * @return
+   * @throws IOException
+   */
+  public static byte[] createUserPayload(Configuration conf, 
+      String outputFormatName, boolean useNewApi) throws IOException {
+    Configuration outputConf = new JobConf(conf);
+    outputConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormatName);
+    outputConf.setBoolean("mapred.mapper.new-api", useNewApi);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(outputConf,
+        null);
+    MRHelpers.doJobClientMagic(outputConf);
+    return TezUtils.createUserPayloadFromConf(outputConf);
+  }
 
   @Override
   public List<Event> initialize() throws IOException, InterruptedException {


Mime
View raw message