tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [1/2] git commit: TEZ-1394. Create example code for OrderedWordCount (bikas)
Date Fri, 08 Aug 2014 18:43:20 GMT
Repository: tez
Updated Branches:
  refs/heads/master 47ccad809 -> 5328978f4


TEZ-1394. Create example code for OrderedWordCount (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ed32980f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ed32980f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ed32980f

Branch: refs/heads/master
Commit: ed32980f8352615aee9af8cbc8c865749fa37d4c
Parents: 47ccad8
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Aug 8 11:42:51 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Aug 8 11:42:51 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 INSTALL.txt                                     |  10 +-
 .../apache/tez/examples/OrderedWordCount.java   | 215 ++++++++
 .../tez/mapreduce/examples/ExampleDriver.java   |   3 +
 .../mapreduce/examples/OrderedWordCount.java    | 527 -------------------
 .../tez/mapreduce/examples/WordCount.java       |  82 +--
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 197 +++++++
 .../org/apache/tez/test/TestSecureShuffle.java  |   4 +-
 .../java/org/apache/tez/test/TestTezJobs.java   | 248 +--------
 9 files changed, 454 insertions(+), 833 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1e5450..228e682 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -49,6 +49,7 @@ INCOMPATIBLE CHANGES
   TEZ-1379. Allow EdgeConfigurers to accept Configuration for Comparators. Change the way partitioner, comparator, combiner confs are set (from Hadoop Configuration to Map). Rename specific Input/Output classes from *Configuration to *Configurer.
   TEZ-1382. Change ObjectRegistry API to allow for future extensions
   TEZ-1386. TezGroupedSplitsInputFormat should not need to be setup to enable grouping.
+  TEZ-1394. Create example code for OrderedWordCount
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/INSTALL.txt
----------------------------------------------------------------------
diff --git a/INSTALL.txt b/INSTALL.txt
index 1328997..6756b8f 100644
--- a/INSTALL.txt
+++ b/INSTALL.txt
@@ -40,25 +40,25 @@ $HADOOP_PREFIX/bin/hadoop jar hadoop-mapreduce-client-jobclient-2.2.0-tests.jar
 This will use the TEZ DAG ApplicationMaster to run the MR job. This can be
 verified by looking at the AM's logs from the YARN ResourceManager UI.
 
-7) There is a basic example of using an MRR job in the tez-mapreduce-examples.jar. Refer to OrderedWordCount.java
+7) There is a basic example of using an MRR job in the tez-mapreduce-examples.jar. Refer to TestOrderedWordCount.java
 in the source code. To run this example:
 
-$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar orderedwordcount <input> <output>
+$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar testorderedwordcount <input> <output>
 
 This will use the TEZ DAG ApplicationMaster to run the ordered word count job. This job is similar
 to the word count example except that it also orders all words based on the frequency of
 occurrence.
 
-There are multiple variations to run orderedwordcount. You can use it to run multiple
+There are multiple variations to run testorderedwordcount. You can use it to run multiple
 DAGs serially on different inputs/outputs. These DAGs could be run separately as
 different applications or serially within a single TEZ session.
 
-$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar orderedwordcount <input1> <output1> <input2> <output2> <input3> <output3> ...
+$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar testorderedwordcount <input1> <output1> <input2> <output2> <input3> <output3> ...
 
 The above will run multiple DAGs for each input-output pair. To use TEZ sessions,
 set -DUSE_TEZ_SESSION=true
 
-$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar orderedwordcount -DUSE_TEZ_SESSION=true <input1> <output1> <input2> <output2>
+$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar testorderedwordcount -DUSE_TEZ_SESSION=true <input1> <output1> <input2> <output2>
 
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
new file mode 100644
index 0000000..d0fd83b
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.examples;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.examples.WordCount.TokenProcessor;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+
+import com.google.common.base.Preconditions;
+
+public class OrderedWordCount extends Configured implements Tool  {
+  
+  public static class SumProcessor extends SimpleProcessor {
+    public SumProcessor(TezProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkArgument(getInputs().size() == 1);
+      Preconditions.checkArgument(getOutputs().size() == 1);
+      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().values().iterator().next().getWriter();
+      KeyValuesReader kvReader = (KeyValuesReader) getInputs().values().iterator().next()
+          .getReader();
+      while (kvReader.next()) {
+        Text word = (Text) kvReader.getCurrentKey();
+        int sum = 0;
+        for (Object value : kvReader.getCurrentValues()) {
+          sum += ((IntWritable) value).get();
+        }
+        kvWriter.write(new IntWritable(sum), word);
+      }
+    }
+  }
+  
+  /**
+   * OrderPartitionedEdge ensures that we get the data sorted by
+   * the sum key which means that the result is totally ordered
+   */
+  public static class NoOpSorter extends SimpleMRProcessor {
+
+    public NoOpSorter(TezProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkArgument(getInputs().size() == 1);
+      Preconditions.checkArgument(getOutputs().size() == 1);
+      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().values().iterator().next().getWriter();
+      KeyValuesReader kvReader = (KeyValuesReader) getInputs().values().iterator().next()
+          .getReader();
+      while (kvReader.next()) {
+        Object sum = kvReader.getCurrentKey();
+        for (Object word : kvReader.getCurrentValues()) {
+          kvWriter.write(word, sum);
+        }
+      }
+      // deriving from SimpleMRProcessor takes care of committing the output
+    }
+  }
+  
+  private DAG createDAG(TezConfiguration tezConf, Map<String, LocalResource> localResources,
+      String inputPath, String outputPath, int numPartitions) throws IOException {
+
+    DataSourceDescriptor dataSource = MRInput.createConfigurer(new Configuration(tezConf),
+        TextInputFormat.class, inputPath).create();
+
+    DataSinkDescriptor dataSink = MROutput.createConfigurer(new Configuration(tezConf),
+        TextOutputFormat.class, outputPath).create();
+
+    Vertex tokenizerVertex = new Vertex("Tokenizer", new ProcessorDescriptor(
+        TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
+    tokenizerVertex.addDataSource("MRInput", dataSource);
+
+    Vertex summationVertex = new Vertex("Summation",
+        new ProcessorDescriptor(
+            SumProcessor.class.getName()), numPartitions, MRHelpers.getReduceResource(tezConf));
+    
+    // 1 task for global sorted order
+    Vertex sorterVertex = new Vertex("Sorter",
+        new ProcessorDescriptor(
+            NoOpSorter.class.getName()), 1, MRHelpers.getReduceResource(tezConf));
+    sorterVertex.addDataSink("MROutput", dataSink);
+
+    OrderedPartitionedKVEdgeConfigurer summationEdgeConf = OrderedPartitionedKVEdgeConfigurer
+        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
+            HashPartitioner.class.getName()).build();
+
+    OrderedPartitionedKVEdgeConfigurer sorterEdgeConf = OrderedPartitionedKVEdgeConfigurer
+        .newBuilder(IntWritable.class.getName(), Text.class.getName(),
+            HashPartitioner.class.getName()).build();
+
+    // No need to add jar containing this class as assumed to be part of the tez jars.
+    
+    DAG dag = new DAG("OrderedWordCount");
+    dag.addVertex(tokenizerVertex)
+        .addVertex(summationVertex)
+        .addVertex(sorterVertex)
+        .addEdge(
+            new Edge(tokenizerVertex, summationVertex, summationEdgeConf.createDefaultEdgeProperty()))
+        .addEdge(
+            new Edge(summationVertex, sorterVertex, sorterEdgeConf.createDefaultEdgeProperty()));
+    return dag;  
+  }
+  
+  private static void printUsage() {
+    System.err.println("Usage: " + " orderedwordcount in out [numPartitions]");
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
+  public boolean run(String inputPath, String outputPath, Configuration conf,
+      int numPartitions) throws Exception {
+    System.out.println("Running OrderedWordCount");
+    TezConfiguration tezConf;
+    if (conf != null) {
+      tezConf = new TezConfiguration(conf);
+    } else {
+      tezConf = new TezConfiguration();
+    }
+    
+    TezClient tezClient = new TezClient("OrderedWordCount", tezConf);
+    tezClient.start();
+
+    try {
+        Map<String, LocalResource> localResources =
+          new TreeMap<String, LocalResource>();
+        
+        DAG dag = createDAG(tezConf, localResources, inputPath, outputPath, numPartitions);
+
+        tezClient.waitTillReady();
+        DAGClient dagClient = tezClient.submitDAG(dag);
+
+        // monitoring
+        DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+          System.out.println("OrderedWordCount failed with diagnostics: " + dagStatus.getDiagnostics());
+          return false;
+        }
+        return true;
+    } finally {
+      tezClient.stop();
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+
+    if (otherArgs.length < 2 || otherArgs.length > 3) {
+      printUsage();
+      return 2;
+    }
+    OrderedWordCount job = new OrderedWordCount();
+    if (job.run(otherArgs[0], otherArgs[1], conf,
+        (otherArgs.length == 3 ? Integer.parseInt(otherArgs[2]) : 1))) {
+      return 0;
+    }
+    return 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new OrderedWordCount(), args);
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 636e83b..daebf7c 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -31,6 +31,7 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.Progress;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.examples.OrderedWordCount;
 import org.apache.tez.mapreduce.examples.terasort.TeraGen;
 import org.apache.tez.mapreduce.examples.terasort.TeraSort;
 import org.apache.tez.mapreduce.examples.terasort.TeraValidate;
@@ -79,6 +80,8 @@ public class ExampleDriver {
           "MRR Sleep Job");
       pgd.addClass("orderedwordcount", OrderedWordCount.class,
           "Word Count with words sorted on frequency");
+      pgd.addClass("testorderedwordcount", TestOrderedWordCount.class,
+          "Word Count with words sorted on frequency");
       pgd.addClass("unionexample", UnionExample.class,
           "Union example");
       pgd.addClass("broadcastAndOneToOneExample", BroadcastAndOneToOneExample.class,

http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
deleted file mode 100644
index 1d72745..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ /dev/null
@@ -1,527 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.TreeMap;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
-import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.processor.map.MapProcessor;
-import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-import org.apache.tez.runtime.library.processor.SleepProcessor;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * An MRR job built on top of word count to return words sorted by
- * their frequency of occurrence.
- *
- * Use -DUSE_TEZ_SESSION=true to run jobs in a session mode.
- * If multiple input/outputs are provided, this job will process each pair
- * as a separate DAG in a sequential manner.
- * Use -DINTER_JOB_SLEEP_INTERVAL=<N> where N is the sleep interval in seconds
- * between the sequential DAGs.
- */
-public class OrderedWordCount extends Configured implements Tool {
-
-  private static Log LOG = LogFactory.getLog(OrderedWordCount.class);
-
-  public static class TokenizerMapper
-       extends Mapper<Object, Text, Text, IntWritable>{
-
-    private final static IntWritable one = new IntWritable(1);
-    private Text word = new Text();
-
-    public void map(Object key, Text value, Context context
-                    ) throws IOException, InterruptedException {
-      StringTokenizer itr = new StringTokenizer(value.toString());
-      while (itr.hasMoreTokens()) {
-        word.set(itr.nextToken());
-        context.write(word, one);
-      }
-    }
-  }
-
-  public static class IntSumReducer
-       extends Reducer<Text,IntWritable,IntWritable, Text> {
-    private IntWritable result = new IntWritable();
-
-    public void reduce(Text key, Iterable<IntWritable> values,
-                       Context context
-                       ) throws IOException, InterruptedException {
-      int sum = 0;
-      for (IntWritable val : values) {
-        sum += val.get();
-      }
-      result.set(sum);
-      context.write(result, key);
-    }
-  }
-
-  /**
-   * Shuffle ensures ordering based on count of employees per department
-   * hence the final reducer is a no-op and just emits the department name
-   * with the employee count per department.
-   */
-  public static class MyOrderByNoOpReducer
-      extends Reducer<IntWritable, Text, Text, IntWritable> {
-
-    public void reduce(IntWritable key, Iterable<Text> values,
-        Context context
-        ) throws IOException, InterruptedException {
-      for (Text word : values) {
-        context.write(word, key);
-      }
-    }
-  }
-
-  private Credentials credentials = new Credentials();
-
-  @VisibleForTesting
-  public DAG createDAG(FileSystem fs, Configuration conf,
-      Map<String, LocalResource> commonLocalResources, Path stagingDir,
-      int dagIndex, String inputPath, String outputPath,
-      boolean generateSplitsInClient) throws Exception {
-
-    Configuration mapStageConf = new JobConf(conf);
-    mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
-        TokenizerMapper.class.getName());
-    mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
-        TextInputFormat.class.getName());
-    mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
-    mapStageConf.setBoolean("mapred.mapper.new-api", true);
-
-    InputSplitInfo inputSplitInfo = null;
-    if (generateSplitsInClient) {
-      inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf, stagingDir);
-      mapStageConf.setInt(MRJobConfig.NUM_MAPS, inputSplitInfo.getNumTasks());
-    }
-
-    MRHelpers.translateVertexConfToTez(mapStageConf);
-
-    Configuration iReduceStageConf = new JobConf(conf);
-    // TODO replace with auto-reduce parallelism
-    iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2);
-    iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
-        IntSumReducer.class.getName());
-    iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
-    iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
-        IntWritable.class.getName());
-    iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
-    MRHelpers.translateVertexConfToTez(iReduceStageConf);
-
-    Configuration finalReduceConf = new JobConf(conf);
-    finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
-    finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
-        MyOrderByNoOpReducer.class.getName());
-    finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
-        TextOutputFormat.class.getName());
-    finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
-    finalReduceConf.setBoolean("mapred.mapper.new-api", true);
-    finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
-    finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
-    MRHelpers.translateVertexConfToTez(finalReduceConf);
-
-    MRHelpers.doJobClientMagic(mapStageConf);
-    MRHelpers.doJobClientMagic(iReduceStageConf);
-    MRHelpers.doJobClientMagic(finalReduceConf);
-
-    List<Vertex> vertices = new ArrayList<Vertex>();
-
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
-    mapStageConf.writeXml(outputStream);
-    String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
-    byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
-    byte[] mapInputPayload;
-    if (generateSplitsInClient) {
-      mapInputPayload = MRHelpers.createMRInputPayload(mapPayload);
-    } else {
-      mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload);
-    }
-    int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
-    Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
-        MapProcessor.class.getName()).setUserPayload(mapPayload)
-            .setHistoryText(mapStageHistoryText),
-        numMaps, MRHelpers.getMapResource(mapStageConf));
-    if (generateSplitsInClient) {
-      mapVertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
-      Map<String, LocalResource> mapLocalResources =
-          new HashMap<String, LocalResource>();
-      mapLocalResources.putAll(commonLocalResources);
-      MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo,
-          mapLocalResources);
-      mapVertex.setTaskLocalFiles(mapLocalResources);
-    } else {
-      mapVertex.setTaskLocalFiles(commonLocalResources);
-    }
-
-    Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
-        : MRInputAMSplitGenerator.class;
-    MRHelpers.addMRInput(mapVertex, mapInputPayload,
-        (initializerClazz==null) ? null :
-          new InputInitializerDescriptor(initializerClazz.getName()));
-    vertices.add(mapVertex);
-
-    ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
-    iReduceStageConf.writeXml(iROutputStream);
-    String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
-    Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
-        ReduceProcessor.class.getName())
-            .setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf))
-            .setHistoryText(iReduceStageHistoryText),
-        2, MRHelpers.getReduceResource(iReduceStageConf));
-    ivertex.setTaskLocalFiles(commonLocalResources);
-    vertices.add(ivertex);
-
-    ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
-    finalReduceConf.writeXml(finalReduceOutputStream);
-    String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
-    byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
-    Vertex finalReduceVertex = new Vertex("finalreduce",
-        new ProcessorDescriptor(
-            ReduceProcessor.class.getName())
-                .setUserPayload(finalReducePayload)
-                .setHistoryText(finalReduceStageHistoryText), 1,
-        MRHelpers.getReduceResource(finalReduceConf));
-    finalReduceVertex.setTaskLocalFiles(commonLocalResources);
-    MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
-    vertices.add(finalReduceVertex);
-
-    DAG dag = new DAG("OrderedWordCount" + dagIndex);
-    for (int i = 0; i < vertices.size(); ++i) {
-      dag.addVertex(vertices.get(i));
-    }
-
-    OrderedPartitionedKVEdgeConfigurer edgeConf1 = OrderedPartitionedKVEdgeConfigurer
-        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
-            HashPartitioner.class.getName()).setFromConfiguration(conf)
-	    .configureInput().useLegacyInput().done().build();
-    dag.addEdge(
-        new Edge(dag.getVertex("initialmap"), dag.getVertex("intermediate_reducer"),
-            edgeConf1.createDefaultEdgeProperty()));
-
-    OrderedPartitionedKVEdgeConfigurer edgeConf2 = OrderedPartitionedKVEdgeConfigurer
-        .newBuilder(IntWritable.class.getName(), Text.class.getName(),
-            HashPartitioner.class.getName()).setFromConfiguration(conf)
-            .configureInput().useLegacyInput().done().build();
-    dag.addEdge(
-        new Edge(dag.getVertex("intermediate_reducer"), dag.getVertex("finalreduce"),
-            edgeConf2.createDefaultEdgeProperty()));
-
-    return dag;
-  }
-
-  private static void printUsage() {
-    String options = " [-generateSplitsInClient true/<false>]";
-    System.err.println("Usage: orderedwordcount <in> <out>" + options);
-    System.err.println("Usage (In Session Mode):"
-        + " orderedwordcount <in1> <out1> ... <inN> <outN>" + options);
-    ToolRunner.printGenericCommandUsage(System.err);
-  }
-
-
-  @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-
-    boolean generateSplitsInClient;
-
-    SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
-    try {
-      generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
-      otherArgs = splitCmdLineParser.getRemainingArgs();
-    } catch (ParseException e1) {
-      System.err.println("Invalid options");
-      printUsage();
-      return 2;
-    }
-
-    boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
-    long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0)
-        * 1000;
-
-    boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false);
-
-    if (((otherArgs.length%2) != 0)
-        || (!useTezSession && otherArgs.length != 2)) {
-      printUsage();
-      return 2;
-    }
-
-    List<String> inputPaths = new ArrayList<String>();
-    List<String> outputPaths = new ArrayList<String>();
-
-    for (int i = 0; i < otherArgs.length; i+=2) {
-      inputPaths.add(otherArgs[i]);
-      outputPaths.add(otherArgs[i+1]);
-    }
-
-    UserGroupInformation.setConfiguration(conf);
-
-    TezConfiguration tezConf = new TezConfiguration(conf);
-    OrderedWordCount instance = new OrderedWordCount();
-
-    FileSystem fs = FileSystem.get(conf);
-
-    String stagingDirStr =  conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
-            TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + Path.SEPARATOR + 
-            Long.toString(System.currentTimeMillis());
-    Path stagingDir = new Path(stagingDirStr);
-    FileSystem pathFs = stagingDir.getFileSystem(tezConf);
-    pathFs.mkdirs(new Path(stagingDirStr));
-
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
-    stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
-    
-    TokenCache.obtainTokensForNamenodes(instance.credentials, new Path[] {stagingDir}, conf);
-    TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
-
-    // No need to add jar containing this class as assumed to be part of
-    // the tez jars.
-
-    // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
-    // is the same filesystem as the one used for Input/Output.
-    
-    if (useTezSession) {
-      LOG.info("Creating Tez Session");
-      tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
-    } else {
-      tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
-    }
-    TezClient tezSession = new TezClient("OrderedWordCountSession", tezConf,
-        null, instance.credentials);
-    tezSession.start();
-
-    DAGStatus dagStatus = null;
-    DAGClient dagClient = null;
-    String[] vNames = { "initialmap", "intermediate_reducer",
-      "finalreduce" };
-
-    Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-    try {
-      for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) {
-        if (dagIndex != 1
-            && interJobSleepTimeout > 0) {
-          try {
-            LOG.info("Sleeping between jobs, sleepInterval="
-                + (interJobSleepTimeout/1000));
-            Thread.sleep(interJobSleepTimeout);
-          } catch (InterruptedException e) {
-            LOG.info("Main thread interrupted. Breaking out of job loop");
-            break;
-          }
-        }
-
-        String inputPath = inputPaths.get(dagIndex-1);
-        String outputPath = outputPaths.get(dagIndex-1);
-
-        if (fs.exists(new Path(outputPath))) {
-          throw new FileAlreadyExistsException("Output directory "
-              + outputPath + " already exists");
-        }
-        LOG.info("Running OrderedWordCount DAG"
-            + ", dagIndex=" + dagIndex
-            + ", inputPath=" + inputPath
-            + ", outputPath=" + outputPath);
-
-        Map<String, LocalResource> localResources =
-          new TreeMap<String, LocalResource>();
-        
-        DAG dag = instance.createDAG(fs, conf, localResources,
-            stagingDir, dagIndex, inputPath, outputPath,
-            generateSplitsInClient);
-
-        boolean doPreWarm = dagIndex == 1 && useTezSession
-            && conf.getBoolean("PRE_WARM_SESSION", true);
-        int preWarmNumContainers = 0;
-        if (doPreWarm) {
-          preWarmNumContainers = conf.getInt("PRE_WARM_NUM_CONTAINERS", 0);
-          if (preWarmNumContainers <= 0) {
-            doPreWarm = false;
-          }
-        }
-        if (doPreWarm) {
-          LOG.info("Pre-warming Session");
-          VertexLocationHint vertexLocationHint =
-              new VertexLocationHint(null);
-          ProcessorDescriptor sleepProcDescriptor =
-            new ProcessorDescriptor(SleepProcessor.class.getName());
-          SleepProcessor.SleepProcessorConfig sleepProcessorConfig =
-            new SleepProcessor.SleepProcessorConfig(4000);
-          sleepProcDescriptor.setUserPayload(
-            sleepProcessorConfig.toUserPayload());
-          PreWarmContext context = new PreWarmContext(sleepProcDescriptor,
-            dag.getVertex("initialmap").getTaskResource(), preWarmNumContainers,
-              vertexLocationHint);
-
-          Map<String, LocalResource> contextLocalRsrcs =
-            new TreeMap<String, LocalResource>();
-          contextLocalRsrcs.putAll(
-            dag.getVertex("initialmap").getTaskLocalFiles());
-          Map<String, String> contextEnv = new TreeMap<String, String>();
-          contextEnv.putAll(dag.getVertex("initialmap").getTaskEnvironment());
-          String contextJavaOpts =
-            dag.getVertex("initialmap").getTaskLaunchCmdOpts();
-          context
-            .setLocalResources(contextLocalRsrcs)
-            .setJavaOpts(contextJavaOpts)
-            .setEnvironment(contextEnv);
-
-          tezSession.preWarm(context);
-        }
-
-        if (useTezSession) {
-          LOG.info("Waiting for TezSession to get into ready state");
-          waitForTezSessionReady(tezSession);
-          LOG.info("Submitting DAG to Tez Session, dagIndex=" + dagIndex);
-          dagClient = tezSession.submitDAG(dag);
-          LOG.info("Submitted DAG to Tez Session, dagIndex=" + dagIndex);
-        } else {
-          LOG.info("Submitting DAG as a new Tez Application");
-          dagClient = tezSession.submitDAG(dag);
-        }
-
-        while (true) {
-          dagStatus = dagClient.getDAGStatus(statusGetOpts);
-          if (dagStatus.getState() == DAGStatus.State.RUNNING ||
-              dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
-              dagStatus.getState() == DAGStatus.State.FAILED ||
-              dagStatus.getState() == DAGStatus.State.KILLED ||
-              dagStatus.getState() == DAGStatus.State.ERROR) {
-            break;
-          }
-          try {
-            Thread.sleep(500);
-          } catch (InterruptedException e) {
-            // continue;
-          }
-        }
-
-
-        while (dagStatus.getState() != DAGStatus.State.SUCCEEDED &&
-            dagStatus.getState() != DAGStatus.State.FAILED &&
-            dagStatus.getState() != DAGStatus.State.KILLED &&
-            dagStatus.getState() != DAGStatus.State.ERROR) {
-          if (dagStatus.getState() == DAGStatus.State.RUNNING) {
-            ExampleDriver.printDAGStatus(dagClient, vNames);
-          }
-          try {
-            try {
-              Thread.sleep(1000);
-            } catch (InterruptedException e) {
-              // continue;
-            }
-            dagStatus = dagClient.getDAGStatus(statusGetOpts);
-          } catch (TezException e) {
-            LOG.fatal("Failed to get application progress. Exiting");
-            return -1;
-          }
-        }
-        ExampleDriver.printDAGStatus(dagClient, vNames,
-            true, true);
-        LOG.info("DAG " + dagIndex + " completed. "
-            + "FinalState=" + dagStatus.getState());
-        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-          LOG.info("DAG " + dagIndex + " diagnostics: "
-            + dagStatus.getDiagnostics());
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("Error occurred when submitting/running DAGs", e);
-      throw e;
-    } finally {
-      if (!retainStagingDir) {
-        pathFs.delete(stagingDir, true);
-      }
-      LOG.info("Shutting down session");
-      tezSession.stop();
-    }
-
-    if (!useTezSession) {
-      ExampleDriver.printDAGStatus(dagClient, vNames);
-      LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
-    }
-    return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
-  }
-
-  private static void waitForTezSessionReady(TezClient tezSession)
-    throws IOException, TezException, InterruptedException {
-    tezSession.waitTillReady();
-  }
-
-  public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(), new OrderedWordCount(), args);
-    System.exit(res);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index cb13025..593eeff 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -18,24 +18,17 @@
 package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.StringTokenizer;
-import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
@@ -60,10 +53,11 @@ import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
 import com.google.common.base.Preconditions;
 
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
 
 
 public class WordCount extends Configured implements Tool {
-  public static class TokenProcessor extends SimpleMRProcessor {
+  public static class TokenProcessor extends SimpleProcessor {
     IntWritable one = new IntWritable(1);
     Text word = new Text();
 
@@ -98,8 +92,8 @@ public class WordCount extends Configured implements Tool {
     @Override
     public void run() throws Exception {
       Preconditions.checkArgument(getInputs().size() == 1);
-      MROutput out = (MROutput) getOutputs().values().iterator().next();
-      KeyValueWriter kvWriter = out.getWriter();
+      Preconditions.checkArgument(getOutputs().size() == 1);
+      KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().values().iterator().next().getWriter();
       KeyValuesReader kvReader = (KeyValuesReader) getInputs().values().iterator().next()
           .getReader();
       while (kvReader.next()) {
@@ -110,12 +104,12 @@ public class WordCount extends Configured implements Tool {
         }
         kvWriter.write(word, new IntWritable(sum));
       }
+      // deriving from SimpleMRProcessor takes care of committing the output
     }
   }
 
-  private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
-      Map<String, LocalResource> localResources, Path stagingDir,
-      String inputPath, String outputPath) throws IOException {
+  private DAG createDAG(TezConfiguration tezConf, String inputPath, String outputPath,
+      int numPartitions) throws IOException {
 
     DataSourceDescriptor dataSource = MRInput.createConfigurer(new Configuration(tezConf),
         TextInputFormat.class, inputPath).create();
@@ -123,19 +117,21 @@ public class WordCount extends Configured implements Tool {
     DataSinkDescriptor dataSink = MROutput.createConfigurer(new Configuration(tezConf),
         TextOutputFormat.class, outputPath).create();
 
-    Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor(
+    Vertex tokenizerVertex = new Vertex("Tokenizer", new ProcessorDescriptor(
         TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
-    tokenizerVertex.addDataSource("MRInput", dataSource);
+    tokenizerVertex.addDataSource("Input", dataSource);
 
-    Vertex summerVertex = new Vertex("summer",
+    Vertex summerVertex = new Vertex("Summer",
         new ProcessorDescriptor(
-            SumProcessor.class.getName()), 1, MRHelpers.getReduceResource(tezConf));
-    summerVertex.addDataSink("MROutput", dataSink);
+            SumProcessor.class.getName()), numPartitions, MRHelpers.getReduceResource(tezConf));
+    summerVertex.addDataSink("Output", dataSink);
 
     OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), IntWritable.class.getName(),
             HashPartitioner.class.getName()).build();
 
+    // No need to add jar containing this class as assumed to be part of
+    // the tez jars.
     DAG dag = new DAG("WordCount");
     dag.addVertex(tokenizerVertex)
         .addVertex(summerVertex)
@@ -145,66 +141,38 @@ public class WordCount extends Configured implements Tool {
   }
 
   private static void printUsage() {
-    System.err.println("Usage: " + " wordcount <in1> <out1>");
+    System.err.println("Usage: " + " wordcount in out [numPartitions]");
     ToolRunner.printGenericCommandUsage(System.err);
   }
 
-  public boolean run(String inputPath, String outputPath, Configuration conf) throws Exception {
+  public boolean run(String inputPath, String outputPath, Configuration conf,
+      int numPartitions) throws Exception {
     System.out.println("Running WordCount");
-    // conf and UGI
     TezConfiguration tezConf;
     if (conf != null) {
       tezConf = new TezConfiguration(conf);
     } else {
       tezConf = new TezConfiguration();
     }
-    UserGroupInformation.setConfiguration(tezConf);
-    String user = UserGroupInformation.getCurrentUser().getShortUserName();
-
-    // staging dir
-    FileSystem fs = FileSystem.get(tezConf);
-    String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
-        + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
-        + Path.SEPARATOR + Long.toString(System.currentTimeMillis());
-    Path stagingDir = new Path(stagingDirStr);
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
-    stagingDir = fs.makeQualified(stagingDir);
-    
-    // No need to add jar containing this class as assumed to be part of
-    // the tez jars.
 
-    // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
-    // is the same filesystem as the one used for Input/Output.
-    
     TezClient tezSession = new TezClient("WordCountSession", tezConf);
     tezSession.start();
 
-    DAGClient dagClient = null;
 
     try {
-        if (fs.exists(new Path(outputPath))) {
-          throw new FileAlreadyExistsException("Output directory "
-              + outputPath + " already exists");
-        }
-        
-        Map<String, LocalResource> localResources =
-          new TreeMap<String, LocalResource>();
-        
-        DAG dag = createDAG(fs, tezConf, localResources,
-            stagingDir, inputPath, outputPath);
+        DAG dag = createDAG(tezConf, inputPath, outputPath, numPartitions);
 
         tezSession.waitTillReady();
-        dagClient = tezSession.submitDAG(dag);
+        DAGClient dagClient = tezSession.submitDAG(dag);
 
         // monitoring
         DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
         if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-          System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
+          System.out.println("WordCount failed with diagnostics: " + dagStatus.getDiagnostics());
           return false;
         }
         return true;
     } finally {
-      fs.delete(stagingDir, true);
       tezSession.stop();
     }
   }
@@ -213,14 +181,16 @@ public class WordCount extends Configured implements Tool {
   public int run(String[] args) throws Exception {
     Configuration conf = getConf();
     String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-
-    if (otherArgs.length != 2) {
+    if (otherArgs.length < 2 || otherArgs.length > 3) {
       printUsage();
       return 2;
     }
     WordCount job = new WordCount();
-    job.run(otherArgs[0], otherArgs[1], conf);
-    return 0;
+    if (job.run(otherArgs[0], otherArgs[1], conf,
+        (otherArgs.length == 3 ? Integer.parseInt(otherArgs[2]) : 1))) {
+      return 0;
+    }
+    return 1;
   }
 
   public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 7f7bd1e..4228382 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -18,6 +18,11 @@
 
 package org.apache.tez.mapreduce;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -68,6 +73,8 @@ import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezAppMasterStatus;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -85,9 +92,12 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.examples.BroadcastAndOneToOneExample;
+import org.apache.tez.mapreduce.examples.ExampleDriver;
 import org.apache.tez.mapreduce.examples.MRRSleepJob;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
@@ -106,6 +116,8 @@ import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
 import org.apache.tez.test.MiniTezCluster;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -113,6 +125,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 
 public class TestMRRJobsDAGApi {
 
@@ -163,6 +176,190 @@ public class TestMRRJobsDAGApi {
     }
     // TODO Add cleanup code.
   }
+  
+  @Test(timeout = 60000)
+  public void testSleepJob() throws TezException, IOException, InterruptedException {
+    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+    DAG dag = new DAG("TezSleepProcessor");
+    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
+        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+        Resource.newInstance(1024, 1));
+    dag.addVertex(vertex);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+        .nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+    TezClient tezSession = new TezClient("TezSleepProcessor", tezConf, false);
+    tezSession.start();
+
+    DAGClient dagClient = tezSession.submitDAG(dag);
+
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+          + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+
+    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    assertNotNull(dagStatus.getDAGCounters());
+    assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
+    assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
+    ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
+    tezSession.stop();
+  }
+
+  @Test(timeout = 100000)
+  public void testMultipleDAGsWithDuplicateName() throws TezException, IOException,
+      InterruptedException {
+    TezClient tezSession = null;
+    try {
+      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+          .nextInt(100000))));
+      remoteFs.mkdirs(remoteStagingDir);
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+      tezSession = new TezClient("OrderedWordCountSession", tezConf, true);
+      tezSession.start();
+
+      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+      for (int dagIndex = 1; dagIndex <= 2; dagIndex++) {
+        DAG dag = new DAG("TezSleepProcessor");
+        Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
+            SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+            Resource.newInstance(1024, 1));
+        dag.addVertex(vertex);
+
+        DAGClient dagClient = null;
+        try {
+          dagClient = tezSession.submitDAG(dag);
+          if (dagIndex > 1) {
+            fail("Should fail due to duplicate dag name for dagIndex: " + dagIndex);
+          }
+        } catch (TezException tex) {
+          if (dagIndex > 1) {
+            assertTrue(tex.getMessage().contains("Duplicate dag name "));
+            continue;
+          }
+          fail("DuplicateDAGName exception thrown for 1st DAG submission");
+        }
+        DAGStatus dagStatus = dagClient.getDAGStatus(null);
+        while (!dagStatus.isCompleted()) {
+          LOG.debug("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+              + dagStatus.getState());
+          Thread.sleep(500l);
+          dagStatus = dagClient.getDAGStatus(null);
+        }
+      }
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+  }
+  
+  
+  @Test
+  public void testNonDefaultFSStagingDir() throws Exception {
+    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+    DAG dag = new DAG("TezSleepProcessor");
+    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
+        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+        Resource.newInstance(1024, 1));
+    dag.addVertex(vertex);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    Path stagingDir = new Path(TEST_ROOT_DIR, "testNonDefaultFSStagingDir"
+        + String.valueOf(random.nextInt(100000)));
+    FileSystem localFs = FileSystem.getLocal(tezConf);
+    stagingDir = localFs.makeQualified(stagingDir);
+    localFs.mkdirs(stagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+
+    TezClient tezSession = new TezClient("TezSleepProcessor", tezConf, false);
+    tezSession.start();
+
+    DAGClient dagClient = tezSession.submitDAG(dag);
+
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+          + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+
+    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    assertNotNull(dagStatus.getDAGCounters());
+    assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
+    assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
+    ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
+    tezSession.stop();
+  }
+
+  // Submits a simple 5 stage sleep job using tez session. Then kills it.
+  @Test(timeout = 60000)
+  public void testHistoryLogging() throws IOException,
+      InterruptedException, TezException, ClassNotFoundException, YarnException {
+    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+    DAG dag = new DAG("TezSleepProcessorHistoryLogging");
+    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
+        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 2,
+        Resource.newInstance(1024, 1));
+    dag.addVertex(vertex);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+        .nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+    FileSystem localFs = FileSystem.getLocal(tezConf);
+    Path historyLogDir = new Path(TEST_ROOT_DIR, "testHistoryLogging");
+    localFs.mkdirs(historyLogDir);
+
+    tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR,
+        localFs.makeQualified(historyLogDir).toString());
+
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
+    TezClient tezSession = new TezClient("TezSleepProcessorHistoryLogging", tezConf);
+    tezSession.start();
+
+    DAGClient dagClient = tezSession.submitDAG(dag);
+
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+          + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+
+    FileStatus historyLogFileStatus = null;
+    for (FileStatus fileStatus : localFs.listStatus(historyLogDir)) {
+      if (fileStatus.isDirectory()) {
+        continue;
+      }
+      Path p = fileStatus.getPath();
+      if (p.getName().startsWith(SimpleHistoryLoggingService.LOG_FILE_NAME_PREFIX)) {
+        historyLogFileStatus = fileStatus;
+        break;
+      }
+    }
+    Assert.assertNotNull(historyLogFileStatus);
+    Assert.assertTrue(historyLogFileStatus.getLen() > 0);
+    tezSession.stop();
+  }
 
   // Submits a simple 5 stage sleep job using the DAG submit API instead of job
   // client.

http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java b/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java
index 9df1580..a947d4b 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
-import org.apache.tez.mapreduce.examples.OrderedWordCount;
+import org.apache.tez.mapreduce.examples.TestOrderedWordCount;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.junit.After;
 import org.junit.Before;
@@ -141,7 +141,7 @@ public class TestSecureShuffle {
           TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, true);
     }
 
-    OrderedWordCount wordCount = new OrderedWordCount();
+    TestOrderedWordCount wordCount = new TestOrderedWordCount();
     wordCount.setConf(new Configuration(miniTezCluster.getConfig()));
 
     String[] args = new String[] { inputLoc.toString(), outputLoc.toString() };

http://git-wip-us.apache.org/repos/asf/tez/blob/ed32980f/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index e35bc07..2515ecb 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -19,7 +19,6 @@
 package org.apache.tez.test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -30,10 +29,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.util.HashSet;
-import java.util.Map;
-import java.util.Random;
 import java.util.Set;
-import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,38 +41,20 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.client.TezClient;
-import org.apache.tez.common.counters.FileSystemCounter;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
-import org.apache.tez.mapreduce.examples.ExampleDriver;
+import org.apache.tez.examples.OrderedWordCount;
 import org.apache.tez.mapreduce.examples.IntersectDataGen;
 import org.apache.tez.mapreduce.examples.IntersectExample;
 import org.apache.tez.mapreduce.examples.IntersectValidate;
-import org.apache.tez.mapreduce.examples.OrderedWordCount;
-import org.apache.tez.runtime.library.processor.SleepProcessor;
-import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.google.common.collect.Sets;
 
 /**
- * Tests which do not rely on Map/Reduce processor
+ * Tests for Tez example jobs
  * 
  */
 public class TestTezJobs {
@@ -88,7 +66,6 @@ public class TestTezJobs {
 
   private static Configuration conf = new Configuration();
   private static FileSystem remoteFs;
-  private Random random = new Random();
 
   private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestTezJobs.class.getName()
       + "-tmpDir";
@@ -128,94 +105,6 @@ public class TestTezJobs {
   }
 
   @Test(timeout = 60000)
-  public void testSleepJob() throws TezException, IOException, InterruptedException {
-    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
-    DAG dag = new DAG("TezSleepProcessor");
-    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
-        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-        Resource.newInstance(1024, 1));
-    dag.addVertex(vertex);
-
-    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
-        .nextInt(100000))));
-    remoteFs.mkdirs(remoteStagingDir);
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-
-    TezClient tezSession = new TezClient("TezSleepProcessor", tezConf, false);
-    tezSession.start();
-
-    DAGClient dagClient = tezSession.submitDAG(dag);
-
-    DAGStatus dagStatus = dagClient.getDAGStatus(null);
-    while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-          + dagStatus.getState());
-      Thread.sleep(500l);
-      dagStatus = dagClient.getDAGStatus(null);
-    }
-    dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
-
-    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    assertNotNull(dagStatus.getDAGCounters());
-    assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
-    assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
-    ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
-    tezSession.stop();
-  }
-
-  @Test(timeout = 100000)
-  public void testMultipleDAGsWithDuplicateName() throws TezException, IOException,
-      InterruptedException {
-    TezClient tezSession = null;
-    try {
-      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
-          .nextInt(100000))));
-      remoteFs.mkdirs(remoteStagingDir);
-      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-      tezSession = new TezClient("OrderedWordCountSession", tezConf, true);
-      tezSession.start();
-
-      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-      for (int dagIndex = 1; dagIndex <= 2; dagIndex++) {
-        DAG dag = new DAG("TezSleepProcessor");
-        Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
-            SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-            Resource.newInstance(1024, 1));
-        dag.addVertex(vertex);
-
-        DAGClient dagClient = null;
-        try {
-          dagClient = tezSession.submitDAG(dag);
-          if (dagIndex > 1) {
-            fail("Should fail due to duplicate dag name for dagIndex: " + dagIndex);
-          }
-        } catch (TezException tex) {
-          if (dagIndex > 1) {
-            assertTrue(tex.getMessage().contains("Duplicate dag name "));
-            continue;
-          }
-          fail("DuplicateDAGName exception thrown for 1st DAG submission");
-        }
-        DAGStatus dagStatus = dagClient.getDAGStatus(null);
-        while (!dagStatus.isCompleted()) {
-          LOG.debug("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-              + dagStatus.getState());
-          Thread.sleep(500l);
-          dagStatus = dagClient.getDAGStatus(null);
-        }
-      }
-    } finally {
-      if (tezSession != null) {
-        tezSession.stop();
-      }
-    }
-  }
-  
-
-  @Test(timeout = 60000)
   public void testIntersectExample() throws Exception {
     IntersectExample intersectExample = new IntersectExample();
     intersectExample.setConf(new Configuration(mrrTezCluster.getConfig()));
@@ -313,102 +202,6 @@ public class TestTezJobs {
       }
     }
   }
-  
-  @Test
-  public void testNonDefaultFSStagingDir() throws Exception {
-    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
-    DAG dag = new DAG("TezSleepProcessor");
-    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
-        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
-        Resource.newInstance(1024, 1));
-    dag.addVertex(vertex);
-
-    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-    Path stagingDir = new Path(TEST_ROOT_DIR, "testNonDefaultFSStagingDir"
-        + String.valueOf(random.nextInt(100000)));
-    FileSystem localFs = FileSystem.getLocal(tezConf);
-    stagingDir = localFs.makeQualified(stagingDir);
-    localFs.mkdirs(stagingDir);
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
-
-    TezClient tezSession = new TezClient("TezSleepProcessor", tezConf, false);
-    tezSession.start();
-
-    DAGClient dagClient = tezSession.submitDAG(dag);
-
-    DAGStatus dagStatus = dagClient.getDAGStatus(null);
-    while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-          + dagStatus.getState());
-      Thread.sleep(500l);
-      dagStatus = dagClient.getDAGStatus(null);
-    }
-    dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
-
-    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-    assertNotNull(dagStatus.getDAGCounters());
-    assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
-    assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
-    ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
-    tezSession.stop();
-  }
-
-  // Submits a simple 5 stage sleep job using tez session. Then kills it.
-  @Test(timeout = 60000)
-  public void testHistoryLogging() throws IOException,
-      InterruptedException, TezException, ClassNotFoundException, YarnException {
-    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
-
-    DAG dag = new DAG("TezSleepProcessorHistoryLogging");
-    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
-        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 2,
-        Resource.newInstance(1024, 1));
-    dag.addVertex(vertex);
-
-    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
-        .nextInt(100000))));
-    remoteFs.mkdirs(remoteStagingDir);
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
-
-    FileSystem localFs = FileSystem.getLocal(tezConf);
-    Path historyLogDir = new Path(TEST_ROOT_DIR, "testHistoryLogging");
-    localFs.mkdirs(historyLogDir);
-
-    tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR,
-        localFs.makeQualified(historyLogDir).toString());
-
-    tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
-    TezClient tezSession = new TezClient("TezSleepProcessorHistoryLogging", tezConf);
-    tezSession.start();
-
-    DAGClient dagClient = tezSession.submitDAG(dag);
-
-    DAGStatus dagStatus = dagClient.getDAGStatus(null);
-    while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-          + dagStatus.getState());
-      Thread.sleep(500l);
-      dagStatus = dagClient.getDAGStatus(null);
-    }
-    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-
-    FileStatus historyLogFileStatus = null;
-    for (FileStatus fileStatus : localFs.listStatus(historyLogDir)) {
-      if (fileStatus.isDirectory()) {
-        continue;
-      }
-      Path p = fileStatus.getPath();
-      if (p.getName().startsWith(SimpleHistoryLoggingService.LOG_FILE_NAME_PREFIX)) {
-        historyLogFileStatus = fileStatus;
-        break;
-      }
-    }
-    Assert.assertNotNull(historyLogFileStatus);
-    Assert.assertTrue(historyLogFileStatus.getLen() > 0);
-    tezSession.stop();
-  }
 
   private void generateOrderedWordCountInput(Path inputDir) throws IOException {
     Path dataPath1 = new Path(inputDir, "inPath1");
@@ -481,40 +274,14 @@ public class TestTezJobs {
     Path outputDir = new Path(outputDirStr);
 
     TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
-    Path simpleLogPath = new Path("/tmp/owc-logging/");
-    remoteFs.mkdirs(simpleLogPath);
-    simpleLogPath = remoteFs.resolvePath(simpleLogPath);
-    tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, simpleLogPath.toString());
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
     TezClient tezSession = null;
 
     try {
-      tezSession = new TezClient("OrderedWordCountSession", tezConf);
-      tezSession.start();
-      tezSession.waitTillReady();
-
-      Map<String, LocalResource> localResourceMap = new TreeMap<String, LocalResource>();
 
-      OrderedWordCount orderedWordCount = new OrderedWordCount();
-      DAG dag = orderedWordCount.createDAG(remoteFs, tezConf, localResourceMap, stagingDirPath,
-          1, inputDirStr, outputDirStr, false);
+      OrderedWordCount job = new OrderedWordCount();
+      Assert.assertTrue("OrderedWordCount failed", job.run(inputDirStr, outputDirStr, tezConf, 2));
 
-      DAGClient dagClient = tezSession.submitDAG(dag);
-      DAGStatus dagStatus = dagClient.getDAGStatus(null);
-      while (!dagStatus.isCompleted()) {
-        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
-            + dagStatus.getState());
-        Thread.sleep(500l);
-        dagStatus = dagClient.getDAGStatus(null);
-      }
-      dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
-
-      assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
-      assertTrue(remoteFs.exists(outputDir));
-      if (tezSession != null) {
-        tezSession.stop();
-        tezSession = null;
-      }
 
       FileStatus[] fileStatuses = remoteFs.listStatus(outputDir);
       Path resultFile = null;
@@ -542,13 +309,8 @@ public class TestTezJobs {
       assertTrue(resultFile != null);
       assertTrue(foundSuccessFile);
       verifyOrderedWordCountOutput(resultFile);
-
-      // check simple history log exists
-      FileStatus[] fileStatuses1 = remoteFs.listStatus(simpleLogPath);
-      Assert.assertEquals(1, fileStatuses1.length);
-      Assert.assertTrue(fileStatuses1[0].getPath().getName().startsWith(
-          SimpleHistoryLoggingService.LOG_FILE_NAME_PREFIX));
     } finally {
+      remoteFs.delete(stagingDirPath, true);
       if (tezSession != null) {
         tezSession.stop();
       }


Mime
View raw message