tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-1917. Examples should extend TezExampleBase. Contributed by Vasanth kumar RJ
Date Mon, 09 Feb 2015 20:36:37 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 cc9306502 -> 9bf99b9a9


TEZ-1917. Examples should extend TezExampleBase. Contributed by Vasanth kumar RJ


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9bf99b9a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9bf99b9a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9bf99b9a

Branch: refs/heads/branch-0.6
Commit: 9bf99b9a99b713afaccebb538e09ca3e6937fdac
Parents: cc93065
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Jan 13 11:00:36 2015 -0800
Committer: Jonathan Eagles <jeagles@gmail.com>
Committed: Mon Feb 9 14:34:54 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/examples/HashJoinExample.java    |  92 ++--------
 .../org/apache/tez/examples/JoinDataGen.java    |  85 ++--------
 .../org/apache/tez/examples/JoinValidate.java   |  79 ++-------
 .../apache/tez/examples/OrderedWordCount.java   |  73 +++-----
 .../tez/examples/SimpleSessionExample.java      | 116 +++++--------
 .../tez/examples/SortMergeJoinExample.java      |  93 ++--------
 .../org/apache/tez/examples/TezExampleBase.java | 170 +++++++++++++++++++
 .../java/org/apache/tez/examples/WordCount.java |  82 +++------
 tez-tests/pom.xml                               |   1 -
 .../mapreduce/examples/BroadcastLoadGen.java    |   1 +
 .../tez/mapreduce/examples/RPCLoadGen.java      |   1 +
 .../tez/mapreduce/examples/TezExampleBase.java  | 169 ------------------
 .../java/org/apache/tez/test/TestTezJobs.java   |   9 +-
 14 files changed, 312 insertions(+), 660 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6460765..352d615 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1917. Examples should extend TezExampleBase.
   TEZ-2056. Tez UI: fix VertexID filter,show only tez configs by default,fix appattemptid.
   TEZ-2052. Tez UI: log view fixes, show version from build, better handling of ats url config.
   TEZ-2043. Tez UI: add progress info from am webservice to dag and vertex views.

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
index e723b1f..fa99d4b 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
@@ -25,16 +25,12 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
@@ -42,17 +38,14 @@ import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
@@ -78,7 +71,7 @@ import com.google.common.base.Preconditions;
  * fragments. Then the keys in the same fragment are joined with each other.
  * This is the default join strategy.
  */
-public class HashJoinExample extends Configured implements Tool {
+public class HashJoinExample extends TezExampleBase {
 
   private static final Log LOG = LogFactory.getLog(HashJoinExample.class);
 
@@ -95,7 +88,8 @@ public class HashJoinExample extends Configured implements Tool {
     System.exit(status);
   }
 
-  private static void printUsage() {
+  @Override
+  protected void printUsage() {
     System.err.println("Usage: "
         + "hashjoin <file1> <file2> <numPartitions> <outPath> ["
         + broadcastOption + "(default false)]");
@@ -103,73 +97,13 @@ public class HashJoinExample extends Configured implements Tool {
   }
 
   @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String[] otherArgs =
-        new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs);
-  }
-
-  public int run(Configuration conf, String[] args, TezClient tezClient)
-      throws Exception {
-    setConf(conf);
-    String[] otherArgs =
-        new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs, tezClient);
-  }
+  protected int runJob(String[] args, TezConfiguration tezConf,
+      TezClient tezClient) throws Exception {
 
-  private int validateArgs(String[] otherArgs) {
-    if (!(otherArgs.length == 4 || otherArgs.length == 5)) {
-      printUsage();
-      return 2;
-    }
-    return 0;
-  }
-
-  private int execute(String[] args) throws TezException, IOException,
-      InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    TezClient tezClient = null;
-    try {
-      tezClient = createTezClient(tezConf);
-      return execute(args, tezConf, tezClient);
-    } finally {
-      if (tezClient != null) {
-        tezClient.stop();
-      }
-    }
-  }
-
-  private int execute(String[] args, TezClient tezClient) throws IOException,
-      TezException, InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    return execute(args, tezConf, tezClient);
-  }
-
-  private TezClient createTezClient(TezConfiguration tezConf)
-      throws TezException, IOException {
-    TezClient tezClient = TezClient.create("HashJoinExample", tezConf);
-    tezClient.start();
-    return tezClient;
-  }
-
-  private int execute(String[] args, TezConfiguration tezConf,
-      TezClient tezClient) throws IOException, TezException,
-      InterruptedException {
     boolean doBroadcast =
         args.length == 5 && args[4].equals(broadcastOption) ? true : false;
     LOG.info("Running HashJoinExample" + (doBroadcast ? "-WithBroadcast" : ""));
 
-    UserGroupInformation.setConfiguration(tezConf);
-
     String streamInputDir = args[0];
     String hashInputDir = args[1];
     int numPartitions = Integer.parseInt(args[2]);
@@ -194,15 +128,15 @@ public class HashJoinExample extends Configured implements Tool {
         createDag(tezConf, streamInputPath, hashInputPath, outputPath,
             numPartitions, doBroadcast);
 
-    tezClient.waitTillReady();
-    DAGClient dagClient = tezClient.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
-    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
-      return -1;
+    return runDag(dag, false, LOG);
+  }
+
+  @Override
+  protected int validateArgs(String[] otherArgs) {
+    if (!(otherArgs.length == 4 || otherArgs.length == 5)) {
+      return 2;
     }
     return 0;
-
   }
 
   private DAG createDag(TezConfiguration tezConf, Path streamPath,

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
index ff73247..8df452f 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
@@ -29,25 +29,18 @@ import java.util.Random;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -55,7 +48,7 @@ import org.apache.tez.runtime.library.api.KeyValueWriter;
 
 import com.google.common.base.Preconditions;
 
-public class JoinDataGen extends Configured implements Tool {
+public class JoinDataGen extends TezExampleBase {
 
   private static final Log LOG = LogFactory.getLog(JoinDataGen.class);
 
@@ -69,7 +62,8 @@ public class JoinDataGen extends Configured implements Tool {
     System.exit(status);
   }
 
-  private static void printUsage() {
+  @Override
+  protected void printUsage() {
     System.err
         .println("Usage: "
             + "joindatagen <outPath1> <path1Size> <outPath2> <path2Size> <expectedResultPath> <numTasks>");
@@ -77,65 +71,10 @@ public class JoinDataGen extends Configured implements Tool {
   }
 
   @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs);
-  }
-  
-  public int run(Configuration conf, String[] args, TezClient tezClient) throws Exception {
-    setConf(conf);
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs, tezClient);
-  }
-  
-  private int validateArgs(String[] otherArgs) {
-    if (otherArgs.length != 6) {
-      printUsage();
-      return 2;
-    }
-    return 0;
-  }
-
-  private int execute(String [] args) throws TezException, IOException, InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    TezClient tezClient = null;
-    try {
-      tezClient = createTezClient(tezConf);
-      return execute(args, tezConf, tezClient);
-    } finally {
-      if (tezClient != null) {
-        tezClient.stop();
-      }
-    }
-  }
-  
-  private int execute(String[] args, TezClient tezClient) throws IOException, TezException,
-      InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    return execute(args, tezConf, tezClient);
-  }
-  
-  private TezClient createTezClient(TezConfiguration tezConf) throws TezException, IOException {
-    TezClient tezClient = TezClient.create("JoinDataGen", tezConf);
-    tezClient.start();
-    return tezClient;
-  }
-  
-  private int execute(String[] args, TezConfiguration tezConf, TezClient tezClient)
-      throws IOException, TezException, InterruptedException {
+  protected int runJob(String[] args, TezConfiguration tezConf,
+      TezClient tezClient) throws Exception {
     LOG.info("Running JoinDataGen");
 
-    UserGroupInformation.setConfiguration(tezConf);
-
     String outDir1 = args[0];
     long outDir1Size = Long.parseLong(args[1]);
     String outDir2 = args[2];
@@ -179,15 +118,15 @@ public class JoinDataGen extends Configured implements Tool {
     DAG dag = createDag(tezConf, largeOutPath, smallOutPath, expectedOutputPath, numTasks,
         largeOutSize, smallOutSize);
 
-    tezClient.waitTillReady();
-    DAGClient dagClient = tezClient.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
-    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
-      return -1;
+    return runDag(dag, false, LOG);
+  }
+
+  @Override
+  protected int validateArgs(String[] otherArgs) {
+    if (otherArgs.length != 6) {
+      return 2;
     }
     return 0;
-
   }
 
   private DAG createDag(TezConfiguration tezConf, Path largeOutPath, Path smallOutPath,

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index 9770f6f..fb9127e 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -23,14 +23,10 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.counters.TezCounter;
@@ -38,7 +34,6 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
@@ -46,8 +41,8 @@ import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.examples.HashJoinExample.ForwardingProcessor;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -56,7 +51,7 @@ import org.apache.tez.runtime.library.processor.SimpleProcessor;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 
-public class JoinValidate extends Configured implements Tool {
+public class JoinValidate extends TezExampleBase {
   private static final Log LOG = LogFactory.getLog(JoinValidate.class);
 
   private static final String LHS_INPUT_NAME = "lhsfile";
@@ -71,69 +66,17 @@ public class JoinValidate extends Configured implements Tool {
     System.exit(status);
   }
 
-  private static void printUsage() {
+  @Override
+  protected void printUsage() {
     System.err.println("Usage: " + "joinvalidate <path1> <path2>");
     ToolRunner.printGenericCommandUsage(System.err);
   }
 
   @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs);
-  }
-  
-  public int run(Configuration conf, String[] args, TezClient tezClient) throws Exception {
-    setConf(conf);
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs, tezClient);
-  } 
-
-  private int validateArgs(String[] otherArgs) {
-    if (otherArgs.length != 3 && otherArgs.length != 2) {
-      printUsage();
-      return 2;
-    }
-    return 0;
-  }
-
-  private int execute(String[] args) throws TezException, IOException, InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    TezClient tezClient = null;
-    try {
-      tezClient = createTezClient(tezConf);
-      return execute(args, tezConf, tezClient);
-    } finally {
-      if (tezClient != null) {
-        tezClient.stop();
-      }
-    }
-  }
-  
-  private int execute(String[] args, TezClient tezClient) throws IOException, TezException,
-      InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    return execute(args, tezConf, tezClient);
-  }
-  
-  private TezClient createTezClient(TezConfiguration tezConf) throws TezException, IOException {
-    TezClient tezClient = TezClient.create("JoinValidate", tezConf);
-    tezClient.start();
-    return tezClient;
-  }
+  protected int runJob(String[] args, TezConfiguration tezConf,
+      TezClient tezClient) throws Exception {
 
-  private int execute(String[] args, TezConfiguration tezConf, TezClient tezClient)
-      throws IOException, TezException, InterruptedException {
     LOG.info("Running JoinValidate");
-    UserGroupInformation.setConfiguration(tezConf);
 
     String lhsDir = args[0];
     String rhsDir = args[1];
@@ -175,6 +118,15 @@ public class JoinValidate extends Configured implements Tool {
         }
       }
     }
+  
+  }
+
+  @Override
+  protected int validateArgs(String[] otherArgs) {
+    if (otherArgs.length != 3 && otherArgs.length != 2) {
+      return 2;
+    }
+    return 0;
   }
 
   private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
@@ -256,5 +208,4 @@ public class JoinValidate extends Configured implements Tool {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
index fbe9a88..9cf21d3 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
@@ -20,15 +20,13 @@ package org.apache.tez.examples;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
@@ -38,8 +36,6 @@ import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.examples.WordCount.TokenProcessor;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
@@ -57,14 +53,15 @@ import com.google.common.base.Preconditions;
  * Simple example that extends the WordCount example to show a chain of processing.
  * The example extends WordCount by sorting the words by their count.
  */
-public class OrderedWordCount extends Configured implements Tool  {
+public class OrderedWordCount extends TezExampleBase {
   
   private static String INPUT = WordCount.INPUT;
   private static String OUTPUT = WordCount.OUTPUT;
   private static String TOKENIZER = WordCount.TOKENIZER;
   private static String SUMMATION = WordCount.SUMMATION;
   private static String SORTER = "Sorter";
-  
+  private static final Log LOG = LogFactory.getLog(OrderedWordCount.class);
+
   /*
    * SumProcessor similar to WordCount except that it writes the count as key and the 
    * word as value. This is because we can and ordered partitioned key value edge to group the 
@@ -180,60 +177,30 @@ public class OrderedWordCount extends Configured implements Tool  {
             Edge.create(summationVertex, sorterVertex, sorterEdgeConf.createDefaultEdgeProperty()));
     return dag;  
   }
-  
-  private static void printUsage() {
+
+  @Override
+  protected void printUsage() {
     System.err.println("Usage: " + " orderedwordcount in out [numPartitions]");
     ToolRunner.printGenericCommandUsage(System.err);
   }
 
-  public boolean run(String inputPath, String outputPath, Configuration conf,
-      int numPartitions) throws Exception {
-    System.out.println("Running OrderedWordCount");
-    TezConfiguration tezConf;
-    if (conf != null) {
-      tezConf = new TezConfiguration(conf);
-    } else {
-      tezConf = new TezConfiguration();
-    }
-    
-    UserGroupInformation.setConfiguration(tezConf);
-    
-    TezClient tezClient = TezClient.create("OrderedWordCount", tezConf);
-    tezClient.start();
-
-    try {
-        DAG dag = createDAG(tezConf, inputPath, outputPath, numPartitions, "OrderedWordCount");
-
-        tezClient.waitTillReady();
-        DAGClient dagClient = tezClient.submitDAG(dag);
-
-        // monitoring
-        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
-        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-          System.out.println("OrderedWordCount failed with diagnostics: " + dagStatus.getDiagnostics());
-          return false;
-        }
-        return true;
-    } finally {
-      tezClient.stop();
-    }
-  }
 
   @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-
+  protected int validateArgs(String[] otherArgs) {
     if (otherArgs.length < 2 || otherArgs.length > 3) {
-      printUsage();
       return 2;
     }
-    OrderedWordCount job = new OrderedWordCount();
-    if (job.run(otherArgs[0], otherArgs[1], conf,
-        (otherArgs.length == 3 ? Integer.parseInt(otherArgs[2]) : 1))) {
-      return 0;
-    }
-    return 1;
+    return 0;
+  }
+
+  @Override
+  protected int runJob(String[] args, TezConfiguration tezConf,
+      TezClient tezClient) throws Exception {
+    DAG dag = createDAG(tezConf, args[0], args[1],
+        args.length == 3 ? Integer.parseInt(args[2]) : 1,
+        "OrderedWordCount");
+    LOG.info("Running OrderedWordCount");
+    return runDag(dag, false, LOG);
   }
 
   public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
index fd70dad..ee81c6b 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
@@ -18,17 +18,13 @@
 
 package org.apache.tez.examples;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
 
 /**
  * Simple example that shows how Tez session mode can be used to run multiple DAGs in the same 
@@ -39,27 +35,37 @@ import org.apache.tez.dag.api.client.DAGStatus;
  * In this example we will be submitting multiple OrderedWordCount DAGs on different inputs to the 
  * same session.
  */
-public class SimpleSessionExample extends Configured implements Tool {
-  
+public class SimpleSessionExample extends TezExampleBase {
+
+  private static final Log LOG = LogFactory.getLog(SimpleSessionExample.class);
   private static final String enablePrewarmConfig = "simplesessionexample.prewarm";
 
-  public boolean run(String[] inputPaths, String[] outputPaths, Configuration conf,
-      int numPartitions) throws Exception {
-    TezConfiguration tezConf;
-    if (conf != null) {
-      tezConf = new TezConfiguration(conf);
-    } else {
-      tezConf = new TezConfiguration();
+  @Override
+  protected void printUsage() {
+    System.err.println("Usage: " + " simplesessionexample <in1,in2> <out1, out2> [numPartitions]");
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
+  @Override
+  protected int validateArgs(String[] otherArgs) {
+    if (otherArgs.length < 2 || otherArgs.length > 3) {
+      return 2;
+    }
+    return 0;
+  }
+
+  @Override
+  protected int runJob(String[] args, TezConfiguration tezConf,
+      TezClient tezClient) throws Exception {
+    System.out.println("Running SimpleSessionExample");
+    String[] inputPaths = args[0].split(",");
+    String[] outputPaths = args[1].split(",");
+    if (inputPaths.length != outputPaths.length) {
+      System.err.println("Inputs and outputs must be equal in number");
+      return 3;
     }
-    
-    // start TezClient in session mode. The same code run in session mode or non-session mode. The 
-    // mode can be changed via configuration. However if the application wants to run exclusively in 
-    // session mode then it can do so in code directly using the appropriate constructor
-    
-    // tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true); // via config OR via code
-    TezClient tezClient = TezClient.create("SimpleSessionExample", tezConf, true);
-    tezClient.start();
-    
+    int numPartitions = args.length == 3 ? Integer.parseInt(args[2]) : 1;
+
     // Session pre-warming allows the user to hide initial startup, resource acquisition latency etc.
     // by pre-allocating execution resources in the Tez session. They can run initialization logic 
     // in these pre-allocated resources (containers) to pre-warm the containers.
@@ -79,64 +85,22 @@ public class SimpleSessionExample extends Configured implements Tool {
       tezClient.preWarm(PreWarmVertex.createConfigBuilder(tezConf).build());
     }
 
-    // the remaining code is the same as submitting any DAG.
-    try {
-      for (int i=0; i<inputPaths.length; ++i) {
-        DAG dag = OrderedWordCount.createDAG(tezConf, inputPaths[i], outputPaths[i], numPartitions,
-            ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session
+    for (int i = 0; i < inputPaths.length; ++i) {
+      DAG dag = OrderedWordCount.createDAG(tezConf, inputPaths[i], outputPaths[i], numPartitions,
+          ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session
 
-        tezClient.waitTillReady();
-        System.out.println("Running dag number " + i);
-        DAGClient dagClient = tezClient.submitDAG(dag);
-
-        // wait to finish
-        DAGStatus dagStatus = dagClient.waitForCompletion();
-        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-          System.out.println("Iteration " + i + " failed with diagnostics: "
-              + dagStatus.getDiagnostics());
-          return false;
-        }
+      LOG.info("Running dag number " + i);
+      if(runDag(dag, false, LOG) != 0) {
+        return -1;
       }
-      return true;
-    } finally {
-      tezClient.stop();
-    }
-  }
-
-  private static void printUsage() {
-    System.err.println("Usage: " + " simplesessionexample <in1,in2> <out1, out2> [numPartitions]");
-    ToolRunner.printGenericCommandUsage(System.err);
-  }
-  
-  @Override
-  public int run(String[] args) throws Exception {
-    System.out.println("Running SimpleSessionExample");
-    Configuration conf = getConf();
-    String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-
-    if (otherArgs.length < 2 || otherArgs.length > 3) {
-      printUsage();
-      return 2;
     }
-    
-    String[] inputPaths = otherArgs[0].split(",");
-    String[] outputPaths = otherArgs[1].split(",");
-    if (inputPaths.length != outputPaths.length) {
-      System.err.println("Inputs and outputs must be equal in number");
-      return 3;
-    }
-    
-    SimpleSessionExample job = new SimpleSessionExample();
-    if (job.run(inputPaths, outputPaths, conf,
-        (otherArgs.length == 3 ? Integer.parseInt(otherArgs[2]) : 1))) {
-      return 0;
-    }
-    return 1;
+    return 0;
   }
 
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(), new SimpleSessionExample(), args);
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
+    int res = ToolRunner.run(tezConf, new SimpleSessionExample(), args);
     System.exit(res);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
index 6b8dfea..b3336d2 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
@@ -23,26 +23,19 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.examples.HashJoinExample.ForwardingProcessor;
 import org.apache.tez.mapreduce.input.MRInput;
@@ -76,7 +69,7 @@ import com.google.common.base.Preconditions;
  * in the hashFile are unique. while for {@link SortMergeJoinExample} it is
  * required that keys in the both 2 datasets are unique.
  */
-public class SortMergeJoinExample extends Configured implements Tool {
+public class SortMergeJoinExample extends TezExampleBase {
 
   private static final Log LOG = LogFactory.getLog(SortMergeJoinExample.class);
 
@@ -92,77 +85,16 @@ public class SortMergeJoinExample extends Configured implements Tool {
     System.exit(status);
   }
 
-  private static void printUsage() {
+  @Override
+  protected void printUsage() {
     System.err.println("Usage: "
         + "sortmergejoin <file1> <file2> <numPartitions> <outPath>");
     ToolRunner.printGenericCommandUsage(System.err);
   }
 
   @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String[] otherArgs =
-        new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs);
-  }
-
-  public int run(Configuration conf, String[] args, TezClient tezClient)
-      throws Exception {
-    setConf(conf);
-    String[] otherArgs =
-        new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs, tezClient);
-  }
-
-  private int validateArgs(String[] otherArgs) {
-    if (otherArgs.length != 4) {
-      printUsage();
-      return 2;
-    }
-    return 0;
-  }
-
-  private int execute(String[] args) throws TezException, IOException,
-      InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    TezClient tezClient = null;
-    try {
-      tezClient = createTezClient(tezConf);
-      return execute(args, tezConf, tezClient);
-    } finally {
-      if (tezClient != null) {
-        tezClient.stop();
-      }
-    }
-  }
-
-  private int execute(String[] args, TezClient tezClient) throws IOException,
-      TezException, InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    return execute(args, tezConf, tezClient);
-  }
-
-  private TezClient createTezClient(TezConfiguration tezConf)
-      throws TezException, IOException {
-    TezClient tezClient = TezClient.create("SortMergeJoinExample", tezConf);
-    tezClient.start();
-    return tezClient;
-  }
-
-  private int execute(String[] args, TezConfiguration tezConf,
-      TezClient tezClient) throws IOException, TezException,
-      InterruptedException {
-    LOG.info("Running SortMergeJoinExample");
-
-    UserGroupInformation.setConfiguration(tezConf);
+  protected int runJob(String[] args, TezConfiguration tezConf,
+      TezClient tezClient) throws Exception {
 
     String inputDir1 = args[0];
     String inputDir2 = args[1];
@@ -183,19 +115,18 @@ public class SortMergeJoinExample extends Configured implements Tool {
       System.err.println("NumPartitions must be > 0");
       return 4;
     }
-
     DAG dag =
         createDag(tezConf, inputPath1, inputPath2, outputPath, numPartitions);
+    LOG.info("Running SortMergeJoinExample");
+    return runDag(dag, false, LOG);
+  }
 
-    tezClient.waitTillReady();
-    DAGClient dagClient = tezClient.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
-    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
-      return -1;
+  @Override
+  protected int validateArgs(String[] otherArgs) {
+    if (otherArgs.length != 4) {
+      return 2;
     }
     return 0;
-
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
new file mode 100644
index 0000000..4bad009
--- /dev/null
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.examples;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+@InterfaceAudience.Private
+public abstract class TezExampleBase extends Configured implements Tool {
+
+  private TezClient tezClientInternal;
+
+  @Override
+  public final int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    return _execute(otherArgs, null, null);
+  }
+
+  /**
+   * Utility method to use the example from within code or a test.
+   *
+   * @param conf      the tez configuration instance which will be used to crate the DAG and
+   *                  possible the Tez Client.
+   * @param args      arguments to the example
+   * @param tezClient an existing running {@link org.apache.tez.client.TezClient} instance if one
+   *                  exists. If no TezClient is specified (null), one will be created based on the
+   *                  provided configuration
+   * @return Zero indicates success, non-zero indicates failure
+   * @throws Exception 
+   */
+  public int run(TezConfiguration conf, String[] args, @Nullable TezClient tezClient) throws
+      Exception {
+    setConf(conf);
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    return _execute(otherArgs, conf, tezClient);
+  }
+
+  /**
+   * @param dag           the dag to execute
+   * @param printCounters whether to print counters or not
+   * @param logger        the logger to use while printing diagnostics
+   * @return Zero indicates success, non-zero indicates failure
+   * @throws TezException
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public int runDag(DAG dag, boolean printCounters, Log logger) throws TezException,
+      InterruptedException, IOException {
+    tezClientInternal.waitTillReady();
+    DAGClient dagClient = tezClientInternal.submitDAG(dag);
+    Set<StatusGetOpts> getOpts = Sets.newHashSet();
+    if (printCounters) {
+      getOpts.add(StatusGetOpts.GET_COUNTERS);
+    }
+
+    DAGStatus dagStatus;
+    dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts);
+
+    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+      logger.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+      return -1;
+    }
+    return 0;
+  }
+
+  private int _validateArgs(String[] args) {
+    int res = validateArgs(args);
+    if (res != 0) {
+      printUsage();
+      return res;
+    }
+    return 0;
+  }
+
+  private int _execute(String[] otherArgs, TezConfiguration tezConf, TezClient tezClient) throws
+      Exception {
+
+    int result = _validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+
+    if (tezConf == null) {
+      tezConf = new TezConfiguration(getConf());
+    }
+    UserGroupInformation.setConfiguration(tezConf);
+    boolean ownTezClient = false;
+    if (tezClient == null) {
+      ownTezClient = true;
+      tezClientInternal = createTezClient(tezConf);
+    } else {
+      tezClientInternal = tezClient;
+    }
+    try {
+      return runJob(otherArgs, tezConf, tezClientInternal);
+    } finally {
+      if (ownTezClient && tezClientInternal != null) {
+        tezClientInternal.stop();
+      }
+    }
+  }
+
+  private TezClient createTezClient(TezConfiguration tezConf) throws IOException, TezException {
+    TezClient tezClient = TezClient.create(getClass().getSimpleName(), tezConf);
+    tezClient.start();
+    return tezClient;
+  }
+
+  /**
+   * Print usage instructions for this example
+   */
+  protected abstract void printUsage();
+
+  /**
+   * Validate the arguments
+   *
+   * @param otherArgs arguments, if any
+   * @return Zero indicates success, non-zero indicates failure
+   */
+  protected abstract int validateArgs(String[] otherArgs);
+
+  /**
+   * Create and execute the actual DAG for the example
+   *
+   * @param args      arguments for execution
+   * @param tezConf   the tez configuration instance to be used while processing the DAG
+   * @param tezClient the tez client instance to use to run the DAG if any custom monitoring is
+   *                  required. Otherwise the utility method {@link #runDag(org.apache.tez.dag.api.DAG,
+   *                  boolean, org.apache.commons.logging.Log)} should be used
+   * @return Zero indicates success, non-zero indicates failure
+   * @throws IOException
+   * @throws TezException
+   */
+  protected abstract int runJob(String[] args, TezConfiguration tezConf,
+                                TezClient tezClient) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
index aadd0e7..e263ae0 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
@@ -20,15 +20,13 @@ package org.apache.tez.examples;
 import java.io.IOException;
 import java.util.StringTokenizer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
@@ -38,8 +36,6 @@ import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
@@ -48,24 +44,24 @@ import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
-
-import com.google.common.base.Preconditions;
-
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.processor.SimpleProcessor;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Simple example to perform WordCount using Tez API's. WordCount is the 
  * HelloWorld program of distributed data processing and counts the number
  * of occurrences of a word in a distributed text data set.
  */
-public class WordCount extends Configured implements Tool {
+public class WordCount extends TezExampleBase {
 
   static String INPUT = "Input";
   static String OUTPUT = "Output";
   static String TOKENIZER = "Tokenizer";
   static String SUMMATION = "Summation";
-  
+  private static final Log LOG = LogFactory.getLog(WordCount.class);
+
   /*
    * Example code to write a processor in Tez.
    * Processors typically apply the main application logic to the data.
@@ -198,65 +194,27 @@ public class WordCount extends Configured implements Tool {
     return dag;  
   }
 
-  private static void printUsage() {
+  @Override
+  protected void printUsage() {
     System.err.println("Usage: " + " wordcount in out [numPartitions]");
     ToolRunner.printGenericCommandUsage(System.err);
   }
 
-  public boolean run(String inputPath, String outputPath, Configuration conf,
-      int numPartitions) throws Exception {
-    System.out.println("Running WordCount");
-    TezConfiguration tezConf;
-    if (conf != null) {
-      tezConf = new TezConfiguration(conf);
-    } else {
-      tezConf = new TezConfiguration();
-    }
-    
-    UserGroupInformation.setConfiguration(tezConf);
-
-    // Create the TezClient to submit the DAG. Pass the tezConf that has all necessary global and 
-    // dag specific configurations
-    TezClient tezClient = TezClient.create("WordCount", tezConf);
-    // TezClient must be started before it can be used
-    tezClient.start();
-
-    try {
-        DAG dag = createDAG(tezConf, inputPath, outputPath, numPartitions);
-
-        // check that the execution environment is ready
-        tezClient.waitTillReady();
-        // submit the dag and receive a dag client to monitor the progress
-        DAGClient dagClient = tezClient.submitDAG(dag);
-
-        // monitor the progress and wait for completion. This method blocks until the dag is done.
-        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
-        // check success or failure and print diagnostics
-        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-          System.out.println("WordCount failed with diagnostics: " + dagStatus.getDiagnostics());
-          return false;
-        }
-        return true;
-    } finally {
-      // stop the client to perform cleanup
-      tezClient.stop();
-    }
-  }
-
   @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+  protected int validateArgs(String[] otherArgs) {
     if (otherArgs.length < 2 || otherArgs.length > 3) {
-      printUsage();
       return 2;
     }
-    WordCount job = new WordCount();
-    if (job.run(otherArgs[0], otherArgs[1], conf,
-        (otherArgs.length == 3 ? Integer.parseInt(otherArgs[2]) : 1))) {
-      return 0;
-    }
-    return 1;
+    return 0;
+  }
+
+  @Override
+  protected int runJob(String[] args, TezConfiguration tezConf,
+      TezClient tezClient) throws Exception {
+    DAG dag = createDAG(tezConf, args[0], args[1],
+        args.length == 3 ? Integer.parseInt(args[2]) : 1);
+    LOG.info("Running WordCount");
+    return runDag(dag, false, LOG);
   }
 
   public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tests/pom.xml b/tez-tests/pom.xml
index 8f2250f..b5e6a72 100644
--- a/tez-tests/pom.xml
+++ b/tez-tests/pom.xml
@@ -45,7 +45,6 @@
     <dependency>
       <groupId>org.apache.tez</groupId>
       <artifactId>tez-examples</artifactId>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.tez</groupId>

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastLoadGen.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastLoadGen.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastLoadGen.java
index bdea1ac..d72cf22 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastLoadGen.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastLoadGen.java
@@ -36,6 +36,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.examples.TezExampleBase;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
index 6776a72..67cf328 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
@@ -45,6 +45,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.examples.TezExampleBase;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.processor.SimpleProcessor;
 import sun.misc.IOUtils;

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TezExampleBase.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TezExampleBase.java
deleted file mode 100644
index 912de6f..0000000
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TezExampleBase.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Set;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-
-public abstract class TezExampleBase extends Configured implements Tool {
-
-  private TezClient tezClientInternal;
-
-  @Override
-  public final int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    return _execute(otherArgs, null, null);
-  }
-
-  /**
-   * Utility method to use the example from within code or a test.
-   *
-   * @param conf      the tez configuration instance which will be used to crate the DAG and
-   *                  possible the Tez Client.
-   * @param args      arguments to the example
-   * @param tezClient an existing running {@link org.apache.tez.client.TezClient} instance if one
-   *                  exists. If no TezClient is specified (null), one will be created based on the
-   *                  provided configuration
-   * @return Zero indicates success, non-zero indicates failure
-   * @throws IOException
-   * @throws TezException
-   */
-  public int run(TezConfiguration conf, String[] args, @Nullable TezClient tezClient) throws
-      IOException,
-      TezException, InterruptedException {
-    setConf(conf);
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    return _execute(otherArgs, conf, tezClient);
-  }
-
-  /**
-   * @param dag           the dag to execute
-   * @param printCounters whether to print counters or not
-   * @param logger        the logger to use while printing diagnostics
-   * @return Zero indicates success, non-zero indicates failure
-   * @throws TezException
-   * @throws InterruptedException
-   * @throws IOException
-   */
-  public int runDag(DAG dag, boolean printCounters, Log logger) throws TezException,
-      InterruptedException, IOException {
-    tezClientInternal.waitTillReady();
-    DAGClient dagClient = tezClientInternal.submitDAG(dag);
-    Set<StatusGetOpts> getOpts = Sets.newHashSet();
-    if (printCounters) {
-      getOpts.add(StatusGetOpts.GET_COUNTERS);
-    }
-
-    DAGStatus dagStatus;
-    dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts);
-
-    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-      logger.info("DAG diagnostics: " + dagStatus.getDiagnostics());
-      return -1;
-    }
-    return 0;
-  }
-
-  private int _validateArgs(String[] args) {
-    int res = validateArgs(args);
-    if (res != 0) {
-      printUsage();
-      return res;
-    }
-    return 0;
-  }
-
-  private int _execute(String[] otherArgs, TezConfiguration tezConf, TezClient tezClient) throws
-      IOException, TezException, InterruptedException {
-
-    int result = _validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-
-    if (tezConf == null) {
-      tezConf = new TezConfiguration(getConf());
-    }
-    UserGroupInformation.setConfiguration(tezConf);
-    boolean ownTezClient = false;
-    if (tezClient == null) {
-      ownTezClient = true;
-      tezClientInternal = createTezClient(tezConf);
-    }
-    try {
-      return runJob(otherArgs, tezConf, tezClientInternal);
-    } finally {
-      if (ownTezClient && tezClientInternal != null) {
-        tezClientInternal.stop();
-      }
-    }
-  }
-
-  private TezClient createTezClient(TezConfiguration tezConf) throws IOException, TezException {
-    TezClient tezClient = TezClient.create(getClass().getSimpleName(), tezConf);
-    tezClient.start();
-    return tezClient;
-  }
-
-  /**
-   * Print usage instructions for this example
-   */
-  protected abstract void printUsage();
-
-  /**
-   * Validate the arguments
-   *
-   * @param otherArgs arguments, if any
-   * @return Zero indicates success, non-zero indicates failure
-   */
-  protected abstract int validateArgs(String[] otherArgs);
-
-  /**
-   * Create and execute the actual DAG for the example
-   *
-   * @param args      arguments for execution
-   * @param tezConf   the tez configuration instance to be used while processing the DAG
-   * @param tezClient the tez client instance to use to run the DAG if any custom monitoring is
-   *                  required. Otherwise the utility method {@link #runDag(org.apache.tez.dag.api.DAG,
-   *                  boolean, org.apache.commons.logging.Log)} should be used
-   * @return Zero indicates success, non-zero indicates failure
-   * @throws IOException
-   * @throws TezException
-   */
-  protected abstract int runJob(String[] args, TezConfiguration tezConf,
-                                TezClient tezClient) throws IOException, TezException,
-      InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/9bf99b9a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index dd8dba9..c1232cd 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.tez.client.TezClient;
@@ -449,7 +450,7 @@ public class TestTezJobs {
     try {
 
       OrderedWordCount job = new OrderedWordCount();
-      Assert.assertTrue("OrderedWordCount failed", job.run(inputDirStr, outputDirStr, tezConf, 2));
+      Assert.assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{inputDirStr, outputDirStr, "2"}, null)==0);
       verifyOutput(outputDir);
 
     } finally {
@@ -496,7 +497,11 @@ public class TestTezJobs {
       int appsBeforeCount = apps != null ? apps.size() : 0;
 
       SimpleSessionExample job = new SimpleSessionExample();
-      Assert.assertTrue("SimpleSessionExample failed", job.run(inputPaths, outputPaths, tezConf, 2));
+      tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
+      Assert.assertTrue(
+          "SimpleSessionExample failed",
+          job.run(tezConf, new String[] { StringUtils.join(",", inputPaths),
+              StringUtils.join(",", outputPaths), "2" }, null) == 0);
 
       for (int i=0; i<numIterations; ++i) {
         verifyOutput(outputDirs[i]);


Mime
View raw message