tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [04/10] TEZ-1055. Rename tez-mapreduce-examples to tez-examples (Hitesh Shah via bikas)
Date Sat, 16 Aug 2014 00:54:52 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
new file mode 100644
index 0000000..939bea0
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.client.MRTezClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+
+/**
+ * Simple example that does a GROUP BY ORDER BY in an MRR job
+ * Consider a query such as
+ * Select DeptName, COUNT(*) as cnt FROM EmployeeTable
+ * GROUP BY DeptName ORDER BY cnt;
+ *
+ * i.e. List all departments with count of employees in each department
+ * and ordered based on department's employee count.
+ *
+ *  Requires an Input file containing 2 strings per line in format of
+ *  <EmployeeName> <DeptName>
+ *
+ *  For example, use the following:
+ *
+ *  #/bin/bash
+ *
+ *  i=1000000
+ *  j=1000
+ *
+ *  id=0
+ *  while [[ "$id" -ne "$i" ]]
+ *  do
+ *    id=`expr $id + 1`
+ *    deptId=`expr $RANDOM % $j + 1`
+ *    deptName=`echo "ibase=10;obase=16;$deptId" | bc`
+ *    echo "$id O$deptName"
+ *  done
+ *
+ */
+public class GroupByOrderByMRRTest extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(GroupByOrderByMRRTest.class);
+
+  /**
+   * Mapper takes in a single line as input containing
+   * employee name and department name and then
+   * emits department name with count of 1
+   */
+  public static class MyMapper
+      extends Mapper<Object, Text, Text, IntWritable> {
+
+    private final static IntWritable one = new IntWritable(1);
+    private final static Text word = new Text();
+
+    public void map(Object key, Text value, Context context
+        ) throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      String empName = "";
+      String deptName = "";
+      if (itr.hasMoreTokens()) {
+        empName = itr.nextToken();
+        if (itr.hasMoreTokens()) {
+          deptName = itr.nextToken();
+        }
+        if (!empName.isEmpty()
+            && !deptName.isEmpty()) {
+          word.set(deptName);
+          context.write(word, one);
+        }
+      }
+    }
+  }
+
+  /**
+   * Intermediate reducer aggregates the total count per department.
+   * It takes department name and count as input and emits the final
+   * count per department name.
+   */
+  public static class MyGroupByReducer
+      extends Reducer<Text, IntWritable, IntWritable, Text> {
+    private IntWritable result = new IntWritable();
+
+    public void reduce(Text key, Iterable<IntWritable> values,
+        Context context
+        ) throws IOException, InterruptedException {
+
+      int sum = 0;
+      for (IntWritable val : values) {
+        sum += val.get();
+      }
+      result.set(sum);
+      context.write(result, key);
+    }
+  }
+
+  /**
+   * Shuffle ensures ordering based on count of employees per department
+   * hence the final reducer is a no-op and just emits the department name
+   * with the employee count per department.
+   */
+  public static class MyOrderByNoOpReducer
+      extends Reducer<IntWritable, Text, Text, IntWritable> {
+
+    public void reduce(IntWritable key, Iterable<Text> values,
+        Context context
+        ) throws IOException, InterruptedException {
+      for (Text word : values) {
+        context.write(word, key);
+      }
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+
+    // Configure intermediate reduces
+    conf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1);
+
+    // Set reducer class for intermediate reduce
+    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
+        "mapreduce.job.reduce.class"), MyGroupByReducer.class, Reducer.class);
+    // Set reducer output key class
+    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
+        "mapreduce.map.output.key.class"), IntWritable.class, Object.class);
+    // Set reducer output value class
+    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
+        "mapreduce.map.output.value.class"), Text.class, Object.class);
+    conf.setInt(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
+        "mapreduce.job.reduces"), 2);
+
+    String[] otherArgs = new GenericOptionsParser(conf, args).
+        getRemainingArgs();
+    if (otherArgs.length != 2) {
+      System.err.println("Usage: groupbyorderbymrrtest <in> <out>");
+      ToolRunner.printGenericCommandUsage(System.err);
+      return 2;
+    }
+
+    @SuppressWarnings("deprecation")
+    Job job = new Job(conf, "groupbyorderbymrrtest");
+
+    job.setJarByClass(GroupByOrderByMRRTest.class);
+
+    // Configure map
+    job.setMapperClass(MyMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    // Configure reduce
+    job.setReducerClass(MyOrderByNoOpReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setNumReduceTasks(1);
+
+    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
+    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
+
+    job.submit();
+    JobID jobId = job.getJobID();
+    ApplicationId appId = TypeConverter.toYarn(jobId).getAppId();
+
+    DAGClient dagClient = MRTezClient.getDAGClient(appId, new TezConfiguration(conf), null);
+    DAGStatus dagStatus;
+    String[] vNames = { "initialmap" , "ireduce1" , "finalreduce" };
+    while (true) {
+      dagStatus = dagClient.getDAGStatus(null);
+      if(dagStatus.getState() == DAGStatus.State.RUNNING ||
+         dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+         dagStatus.getState() == DAGStatus.State.FAILED ||
+         dagStatus.getState() == DAGStatus.State.KILLED ||
+         dagStatus.getState() == DAGStatus.State.ERROR) {
+        break;
+      }
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        // continue;
+      }
+    }
+
+    while (dagStatus.getState() == DAGStatus.State.RUNNING) {
+      try {
+        ExampleDriver.printDAGStatus(dagClient, vNames);
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          // continue;
+        }
+        dagStatus = dagClient.getDAGStatus(null);
+      } catch (TezException e) {
+        LOG.fatal("Failed to get application progress. Exiting");
+        return -1;
+      }
+    }
+
+    ExampleDriver.printDAGStatus(dagClient, vNames);
+    LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
+    return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration configuration = new Configuration();
+    GroupByOrderByMRRTest groupByOrderByMRRTest = new GroupByOrderByMRRTest();
+    int status = ToolRunner.run(configuration, groupByOrderByMRRTest, args);
+    System.exit(status);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
new file mode 100644
index 0000000..711240c
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import com.google.common.base.Preconditions;
+
+public class IntersectDataGen extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(IntersectDataGen.class);
+
+  private static final String STREAM_OUTPUT_NAME = "streamoutput";
+  private static final String HASH_OUTPUT_NAME = "hashoutput";
+  private static final String EXPECTED_OUTPUT_NAME = "expectedoutput";
+
+  public static void main(String[] args) throws Exception {
+    IntersectDataGen dataGen = new IntersectDataGen();
+    int status = ToolRunner.run(new Configuration(), dataGen, args);
+    System.exit(status);
+  }
+
+  private static void printUsage() {
+    System.err
+        .println("Usage: "
+            + "intersectdatagen <outPath1> <path1Size> <outPath2> <path2Size> <expectedResultPath> <numTasks>");
+    ;
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs);
+  }
+  
+  public int run(Configuration conf, String[] args, TezClient tezSession) throws Exception {
+    setConf(conf);
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs, tezSession);
+  }
+  
+  private int validateArgs(String[] otherArgs) {
+    if (otherArgs.length != 6) {
+      printUsage();
+      return 2;
+    }
+    return 0;
+  }
+
+  private int execute(String [] args) throws TezException, IOException, InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    TezClient tezSession = null;
+    try {
+      tezSession = createTezSession(tezConf);
+      return execute(args, tezConf, tezSession);
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+  }
+  
+  private int execute(String[] args, TezClient tezSession) throws IOException, TezException,
+      InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    return execute(args, tezConf, tezSession);
+  }
+  
+  private TezClient createTezSession(TezConfiguration tezConf) throws TezException, IOException {
+    TezClient tezSession = new TezClient("IntersectDataGenSession", tezConf);
+    tezSession.start();
+    return tezSession;
+  }
+  
+  private int execute(String[] args, TezConfiguration tezConf, TezClient tezSession)
+      throws IOException, TezException, InterruptedException {
+    LOG.info("Running IntersectDataGen");
+
+    UserGroupInformation.setConfiguration(tezConf);
+
+    String outDir1 = args[0];
+    long outDir1Size = Long.parseLong(args[1]);
+    String outDir2 = args[2];
+    long outDir2Size = Long.parseLong(args[3]);
+    String expectedOutputDir = args[4];
+    int numTasks = Integer.parseInt(args[5]);
+
+    Path largeOutPath = null;
+    Path smallOutPath = null;
+    long largeOutSize = 0;
+    long smallOutSize = 0;
+
+    if (outDir1Size >= outDir2Size) {
+      largeOutPath = new Path(outDir1);
+      largeOutSize = outDir1Size;
+      smallOutPath = new Path(outDir2);
+      smallOutSize = outDir2Size;
+    } else {
+      largeOutPath = new Path(outDir2);
+      largeOutSize = outDir2Size;
+      smallOutPath = new Path(outDir1);
+      smallOutSize = outDir1Size;
+    }
+
+    Path expectedOutputPath = new Path(expectedOutputDir);
+
+    // Verify output path existence
+    FileSystem fs = FileSystem.get(tezConf);
+    int res = 0;
+    res = checkOutputDirectory(fs, largeOutPath) + checkOutputDirectory(fs, smallOutPath)
+        + checkOutputDirectory(fs, expectedOutputPath);
+    if (res != 0) {
+      return 3;
+    }
+
+    if (numTasks <= 0) {
+      System.err.println("NumTasks must be > 0");
+      return 4;
+    }
+
+    DAG dag = createDag(tezConf, largeOutPath, smallOutPath, expectedOutputPath, numTasks,
+        largeOutSize, smallOutSize);
+
+    tezSession.waitTillReady();
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
+    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+      return -1;
+    }
+    return 0;
+
+  }
+
+  private DAG createDag(TezConfiguration tezConf, Path largeOutPath, Path smallOutPath,
+      Path expectedOutputPath, int numTasks, long largeOutSize, long smallOutSize)
+      throws IOException {
+
+    long largeOutSizePerTask = largeOutSize / numTasks;
+    long smallOutSizePerTask = smallOutSize / numTasks;
+
+    DAG dag = new DAG("IntersectDataGen");
+
+    Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
+        GenDataProcessor.class.getName()).setUserPayload(
+        new UserPayload(GenDataProcessor.createConfiguration(largeOutSizePerTask, smallOutSizePerTask))), numTasks);
+    genDataVertex.addDataSink(STREAM_OUTPUT_NAME, 
+        MROutput.createConfigurer(new Configuration(tezConf),
+            TextOutputFormat.class, largeOutPath.toUri().toString()).create());
+    genDataVertex.addDataSink(HASH_OUTPUT_NAME, 
+        MROutput.createConfigurer(new Configuration(tezConf),
+            TextOutputFormat.class, smallOutPath.toUri().toString()).create());
+    genDataVertex.addDataSink(EXPECTED_OUTPUT_NAME, 
+        MROutput.createConfigurer(new Configuration(tezConf),
+            TextOutputFormat.class, expectedOutputPath.toUri().toString()).create());
+
+    dag.addVertex(genDataVertex);
+
+    return dag;
+  }
+
+  public static class GenDataProcessor extends SimpleMRProcessor {
+
+    private static final Log LOG = LogFactory.getLog(GenDataProcessor.class);
+
+    long streamOutputFileSize;
+    long hashOutputFileSize;
+    float overlapApprox = 0.2f;
+
+    public GenDataProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    public static byte[] createConfiguration(long streamOutputFileSize, long hashOutputFileSize)
+        throws IOException {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(bos);
+      dos.writeLong(streamOutputFileSize);
+      dos.writeLong(hashOutputFileSize);
+      dos.close();
+      bos.close();
+      return bos.toByteArray();
+    }
+
+    @Override
+    public void initialize() throws Exception {
+      byte[] payload = getContext().getUserPayload().getPayload();
+      ByteArrayInputStream bis = new ByteArrayInputStream(payload);
+      DataInputStream dis = new DataInputStream(bis);
+      streamOutputFileSize = dis.readLong();
+      hashOutputFileSize = dis.readLong();
+      LOG.info("Initialized with largeFileTargetSize=" + streamOutputFileSize
+          + ", smallFileTragetSize=" + hashOutputFileSize);
+      dis.close();
+      bis.close();
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkState(getInputs().size() == 0);
+      Preconditions.checkState(getOutputs().size() == 3);
+
+      KeyValueWriter streamOutputWriter = (KeyValueWriter) getOutputs().get(STREAM_OUTPUT_NAME)
+          .getWriter();
+      KeyValueWriter hashOutputWriter = (KeyValueWriter) getOutputs().get(HASH_OUTPUT_NAME)
+          .getWriter();
+      KeyValueWriter expectedOutputWriter = (KeyValueWriter) getOutputs().get(EXPECTED_OUTPUT_NAME)
+          .getWriter();
+
+      float fileSizeFraction = hashOutputFileSize / (float) streamOutputFileSize;
+      Preconditions.checkState(fileSizeFraction > 0.0f && fileSizeFraction <= 1.0f);
+      int mod = 1;
+      int extraKeysMod = 0;
+      if (fileSizeFraction > overlapApprox) {
+        // Common keys capped by overlap. Additional ones required in the hashFile.
+        mod = (int) (1 / overlapApprox);
+        extraKeysMod = (int) (1 / (fileSizeFraction - overlapApprox));
+      } else {
+        // All keys in hashFile must exist in stream file.
+        mod = (int) (1 / fileSizeFraction);
+      }
+      LOG.info("Using mod=" + mod + ", extraKeysMod=" + extraKeysMod);
+
+      long count = 0;
+      long sizeLarge = 0;
+      long sizeSmall = 0;
+      long numLargeFileKeys = 0;
+      long numSmallFileKeys = 0;
+      long numExpectedKeys = 0;
+      while (sizeLarge < streamOutputFileSize) {
+        String str = createOverlapString(13, count);
+        Text text = new Text(str);
+        int size = text.getLength();
+        streamOutputWriter.write(text, NullWritable.get());
+        sizeLarge += size;
+        numLargeFileKeys++;
+        if (count % mod == 0) {
+          hashOutputWriter.write(text, NullWritable.get());
+          sizeSmall += size;
+          numSmallFileKeys++;
+          expectedOutputWriter.write(text, NullWritable.get());
+          numExpectedKeys++;
+        }
+        if (extraKeysMod != 0 && count % extraKeysMod == 0) {
+          String nStr = createNonOverlaptring(13, count);
+          Text nText = new Text(nStr);
+          hashOutputWriter.write(nText, NullWritable.get());
+          sizeSmall += nText.getLength();
+          numSmallFileKeys++;
+        }
+        count++;
+      }
+      LOG.info("OutputStats: " + "largeFileNumKeys=" + numLargeFileKeys + ", smallFileNumKeys="
+          + numSmallFileKeys + ", expFileNumKeys=" + numExpectedKeys + ", largeFileSize="
+          + sizeLarge + ", smallFileSize=" + sizeSmall);
+    }
+
+    private String createOverlapString(int size, long count) {
+      StringBuilder sb = new StringBuilder();
+      Random random = new Random();
+      for (int i = 0; i < size; i++) {
+        int r = Math.abs(random.nextInt()) % 26;
+        // Random a-z followed by the count
+        sb.append((char) (97 + r));
+      }
+      sb.append("_").append(getContext().getTaskIndex()).append("_").append(count);
+      return sb.toString();
+    }
+
+    private String createNonOverlaptring(int size, long count) {
+      StringBuilder sb = new StringBuilder();
+      Random random = new Random();
+      for (int i = 0; i < size; i++) {
+        int r = Math.abs(random.nextInt()) % 26;
+        // Random A-Z followed by the count
+        sb.append((char) (65 + r));
+      }
+      sb.append("_").append(getContext().getTaskIndex()).append("_").append(count);
+      return sb.toString();
+    }
+
+  }
+
+  private int checkOutputDirectory(FileSystem fs, Path path) throws IOException {
+    if (fs.exists(path)) {
+      System.err.println("Output directory: " + path + " already exists");
+      return 2;
+    }
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
new file mode 100644
index 0000000..da18014
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+
+import com.google.common.base.Preconditions;
+
+public class IntersectExample extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(IntersectExample.class);
+
+  public static void main(String[] args) throws Exception {
+    IntersectExample intersect = new IntersectExample();
+    int status = ToolRunner.run(new Configuration(), intersect, args);
+    System.exit(status);
+  }
+
+  private static void printUsage() {
+    System.err.println("Usage: " + "intersect <file1> <file2> <numPartitions> <outPath>");
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs);
+  }
+  
+  public int run(Configuration conf, String[] args, TezClient tezSession) throws Exception {
+    setConf(conf);
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs, tezSession);
+  }
+  
+  private int validateArgs(String[] otherArgs) {
+    if (otherArgs.length != 4) {
+      printUsage();
+      return 2;
+    }
+    return 0;
+  }
+
+  private int execute(String[] args) throws TezException, IOException, InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    TezClient tezSession = null;
+    try {
+      tezSession = createTezSession(tezConf);
+      return execute(args, tezConf, tezSession);
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+  }
+  
+  private int execute(String[] args, TezClient tezSession) throws IOException, TezException,
+      InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    return execute(args, tezConf, tezSession);
+  }
+  
+  private TezClient createTezSession(TezConfiguration tezConf) throws TezException, IOException {
+    TezClient tezSession = new TezClient("IntersectExampleSession", tezConf);
+    tezSession.start();
+    return tezSession;
+  }
+  
+  private int execute(String[] args, TezConfiguration tezConf, TezClient tezSession)
+      throws IOException, TezException, InterruptedException {
+    LOG.info("Running IntersectExample");
+
+    UserGroupInformation.setConfiguration(tezConf);
+
+    String streamInputDir = args[0];
+    String hashInputDir = args[1];
+    int numPartitions = Integer.parseInt(args[2]);
+    String outputDir = args[3];
+
+    Path streamInputPath = new Path(streamInputDir);
+    Path hashInputPath = new Path(hashInputDir);
+    Path outputPath = new Path(outputDir);
+
+    // Verify output path existence
+    FileSystem fs = FileSystem.get(tezConf);
+    if (fs.exists(outputPath)) {
+      System.err.println("Output directory: " + outputDir + " already exists");
+      return 3;
+    }
+    if (numPartitions <= 0) {
+      System.err.println("NumPartitions must be > 0");
+      return 4;
+    }
+
+    DAG dag = createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions);
+
+    tezSession.waitTillReady();
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
+    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+      return -1;
+    }
+    return 0;
+
+  }
+
+  private DAG createDag(TezConfiguration tezConf, Path streamPath, Path hashPath, Path outPath,
+      int numPartitions) throws IOException {
+    DAG dag = new DAG("IntersectExample");
+
+    // Configuration for intermediate output - shared by Vertex1 and Vertex2
+    // This should only be setting selective keys from the underlying conf. Fix after there's a
+    // better mechanism to configure the IOs.
+
+    UnorderedPartitionedKVEdgeConfigurer edgeConf =
+        UnorderedPartitionedKVEdgeConfigurer
+            .newBuilder(Text.class.getName(), NullWritable.class.getName(),
+                HashPartitioner.class.getName()).build();
+
+    // Change the way resources are setup - no MRHelpers
+    Vertex streamFileVertex = new Vertex("partitioner1", new ProcessorDescriptor(
+        ForwardingProcessor.class.getName())).addDataSource(
+        "streamfile",
+        MRInput
+            .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+                streamPath.toUri().toString()).groupSplits(false).create());
+
+    Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor(
+        ForwardingProcessor.class.getName())).addDataSource(
+        "hashfile",
+        MRInput
+            .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+                hashPath.toUri().toString()).groupSplits(false).create());
+
+    Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor(
+        IntersectProcessor.class.getName()), numPartitions).addDataSink("finalOutput",
+        MROutput.createConfigurer(new Configuration(tezConf),
+            TextOutputFormat.class, outPath.toUri().toString()).create());
+
+    Edge e1 = new Edge(streamFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
+
+    Edge e2 = new Edge(hashFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
+
+    dag.addVertex(streamFileVertex).addVertex(hashFileVertex).addVertex(intersectVertex)
+        .addEdge(e1).addEdge(e2);
+    return dag;
+  }
+
+  // private void obtainTokens(Credentials credentials, Path... paths) throws IOException {
+  // TokenCache.obtainTokensForNamenodes(credentials, paths, getConf());
+  // }
+
+  /**
+   * Reads key-values from the source and forwards the value as the key for the output
+   */
+  public static class ForwardingProcessor extends SimpleProcessor {
+    public ForwardingProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkState(getInputs().size() == 1);
+      Preconditions.checkState(getOutputs().size() == 1);
+      LogicalInput input = getInputs().values().iterator().next();
+      Reader rawReader = input.getReader();
+      Preconditions.checkState(rawReader instanceof KeyValueReader);
+      LogicalOutput output = getOutputs().values().iterator().next();
+
+      KeyValueReader reader = (KeyValueReader) rawReader;
+      KeyValueWriter writer = (KeyValueWriter) output.getWriter();
+
+      while (reader.next()) {
+        Object val = reader.getCurrentValue();
+        writer.write(val, NullWritable.get());
+      }
+    }
+  }
+
+  public static class IntersectProcessor extends SimpleMRProcessor {
+
+    public IntersectProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkState(getInputs().size() == 2);
+      Preconditions.checkState(getOutputs().size() == 1);
+      LogicalInput streamInput = getInputs().get("partitioner1");
+      LogicalInput hashInput = getInputs().get("partitioner2");
+      Reader rawStreamReader = streamInput.getReader();
+      Reader rawHashReader = hashInput.getReader();
+      Preconditions.checkState(rawStreamReader instanceof KeyValueReader);
+      Preconditions.checkState(rawHashReader instanceof KeyValueReader);
+      LogicalOutput lo = getOutputs().values().iterator().next();
+      Preconditions.checkState(lo instanceof MROutput);
+      MROutput output = (MROutput) lo;
+      KeyValueWriter writer = output.getWriter();
+
+      KeyValueReader hashKvReader = (KeyValueReader) rawHashReader;
+      Set<Text> keySet = new HashSet<Text>();
+      while (hashKvReader.next()) {
+        keySet.add(new Text((Text) hashKvReader.getCurrentKey()));
+      }
+
+      KeyValueReader streamKvReader = (KeyValueReader) rawStreamReader;
+      while (streamKvReader.next()) {
+        Text key = (Text) streamKvReader.getCurrentKey();
+        if (keySet.contains(key)) {
+          writer.write(key, NullWritable.get());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
new file mode 100644
index 0000000..c5328c7
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.mapreduce.examples.IntersectExample.ForwardingProcessor;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+public class IntersectValidate extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(IntersectExample.class);
+
+  private static final String LHS_INPUT_NAME = "lhsfile";
+  private static final String RHS_INPUT_NAME = "rhsfile";
+
+  private static final String COUNTER_GROUP_NAME = "INTERSECT_VALIDATE";
+  private static final String MISSING_KEY_COUNTER_NAME = "MISSING_KEY_EXISTS";
+
+  public static void main(String[] args) throws Exception {
+    IntersectValidate validate = new IntersectValidate();
+    int status = ToolRunner.run(new Configuration(), validate, args);
+    System.exit(status);
+  }
+
+  private static void printUsage() {
+    System.err.println("Usage: " + "intersectvalidate <path1> <path2>");
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs);
+  }
+  
+  public int run(Configuration conf, String[] args, TezClient tezSession) throws Exception {
+    setConf(conf);
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    int result = validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+    return execute(otherArgs, tezSession);
+  } 
+
+  private int validateArgs(String[] otherArgs) {
+    if (otherArgs.length != 3 && otherArgs.length != 2) {
+      printUsage();
+      return 2;
+    }
+    return 0;
+  }
+
+  private int execute(String[] args) throws TezException, IOException, InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    TezClient tezSession = null;
+    try {
+      tezSession = createTezSession(tezConf);
+      return execute(args, tezConf, tezSession);
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+  }
+  
+  private int execute(String[] args, TezClient tezSession) throws IOException, TezException,
+      InterruptedException {
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    return execute(args, tezConf, tezSession);
+  }
+  
+  private TezClient createTezSession(TezConfiguration tezConf) throws TezException, IOException {
+    TezClient tezSession = new TezClient("IntersectValidateSession", tezConf);
+    tezSession.start();
+    return tezSession;
+  }
+
+  private int execute(String[] args, TezConfiguration tezConf, TezClient tezSession)
+      throws IOException, TezException, InterruptedException {
+    LOG.info("Running IntersectValidate");
+    UserGroupInformation.setConfiguration(tezConf);
+
+    String lhsDir = args[0];
+    String rhsDir = args[1];
+    int numPartitions = 1;
+    if (args.length == 3) {
+      numPartitions = Integer.parseInt(args[2]);
+    }
+
+    if (numPartitions <= 0) {
+      System.err.println("NumPartitions must be > 0");
+      return 4;
+    }
+
+    Path lhsPath = new Path(lhsDir);
+    Path rhsPath = new Path(rhsDir);
+
+    DAG dag = createDag(tezConf, lhsPath, rhsPath, numPartitions);
+
+    tezSession.waitTillReady();
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
+    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+      return -1;
+    } else {
+      dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+      TezCounter counter = dagStatus.getDAGCounters().findCounter(COUNTER_GROUP_NAME,
+          MISSING_KEY_COUNTER_NAME);
+      if (counter == null) {
+        LOG.info("Unable to determing equality");
+        return -2;
+      } else {
+        if (counter.getValue() != 0) {
+          LOG.info("Validate failed. The two sides are not equivalent");
+          return -3;
+        } else {
+          LOG.info("Validation successful. The two sides are equivalent");
+          return 0;
+        }
+      }
+    }
+  }
+
+  private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
+      throws IOException {
+    DAG dag = new DAG("IntersectValidate");
+
+    // Configuration for intermediate output - shared by Vertex1 and Vertex2
+    // This should only be setting selective keys from the underlying conf. Fix after there's a
+    // better mechanism to configure the IOs.
+    OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
+        .newBuilder(Text.class.getName(), NullWritable.class.getName(),
+            HashPartitioner.class.getName()).build();
+
+    Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor(
+        ForwardingProcessor.class.getName())).addDataSource("lhs",
+        MRInput
+            .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+                lhs.toUri().toString()).groupSplits(false).create());
+
+    Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor(
+        ForwardingProcessor.class.getName())).addDataSource("rhs",
+        MRInput
+            .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+                rhs.toUri().toString()).groupSplits(false).create());
+
+    Vertex intersectValidateVertex = new Vertex("intersectvalidate", new ProcessorDescriptor(
+        IntersectValidateProcessor.class.getName()), numPartitions);
+
+    Edge e1 = new Edge(lhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
+    Edge e2 = new Edge(rhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
+
+    dag.addVertex(lhsVertex).addVertex(rhsVertex).addVertex(intersectValidateVertex).addEdge(e1)
+        .addEdge(e2);
+    return dag;
+  }
+
+  public static class IntersectValidateProcessor extends SimpleProcessor {
+
+    private static final Log LOG = LogFactory.getLog(IntersectValidateProcessor.class);
+
+    public IntersectValidateProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkState(getInputs().size() == 2);
+      Preconditions.checkState(getOutputs().size() == 0);
+      LogicalInput lhsInput = getInputs().get(LHS_INPUT_NAME);
+      LogicalInput rhsInput = getInputs().get(RHS_INPUT_NAME);
+      Reader lhsReaderRaw = lhsInput.getReader();
+      Reader rhsReaderRaw = rhsInput.getReader();
+      Preconditions.checkState(lhsReaderRaw instanceof KeyValuesReader);
+      Preconditions.checkState(rhsReaderRaw instanceof KeyValuesReader);
+      KeyValuesReader lhsReader = (KeyValuesReader) lhsReaderRaw;
+      KeyValuesReader rhsReader = (KeyValuesReader) rhsReaderRaw;
+
+      TezCounter lhsMissingKeyCounter = getContext().getCounters().findCounter(COUNTER_GROUP_NAME,
+          MISSING_KEY_COUNTER_NAME);
+
+      while (lhsReader.next()) {
+        if (rhsReader.next()) {
+          if (!lhsReader.getCurrentKey().equals(rhsReader.getCurrentKey())) {
+            LOG.info("MismatchedKeys: " + "lhs=" + lhsReader.getCurrentKey() + ", rhs=" + rhsReader.getCurrentKey());
+            lhsMissingKeyCounter.increment(1);
+          }
+        } else {
+          lhsMissingKeyCounter.increment(1);
+          LOG.info("ExtraKey in lhs: " + lhsReader.getClass());
+          break;
+        }
+      }
+      if (rhsReader.next()) {
+        lhsMissingKeyCounter.increment(1);
+        LOG.info("ExtraKey in rhs: " + lhsReader.getClass());
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java
new file mode 100644
index 0000000..a3a65ec
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat;
+import org.apache.hadoop.mapreduce.lib.join.TupleWritable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Given a set of sorted datasets keyed with the same class and yielding
+ * equal partitions, it is possible to effect a join of those datasets 
+ * prior to the map. The example facilitates the same.
+ *
+ * To run: bin/hadoop jar build/hadoop-examples.jar join
+ *            [-r <i>reduces</i>]
+ *            [-inFormat <i>input format class</i>] 
+ *            [-outFormat <i>output format class</i>] 
+ *            [-outKey <i>output key class</i>] 
+ *            [-outValue <i>output value class</i>] 
+ *            [-joinOp &lt;inner|outer|override&gt;]
+ *            [<i>in-dir</i>]* <i>in-dir</i> <i>out-dir</i> 
+ */
+public class Join extends Configured implements Tool {
+  public static final String REDUCES_PER_HOST = "mapreduce.join.reduces_per_host";
+  static int printUsage() {
+    System.out.println("join [-r <reduces>] " +
+                       "[-inFormat <input format class>] " +
+                       "[-outFormat <output format class>] " + 
+                       "[-outKey <output key class>] " +
+                       "[-outValue <output value class>] " +
+                       "[-joinOp <inner|outer|override>] " +
+                       "[input]* <input> <output>");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return 2;
+  }
+
+  /**
+   * The main driver for sort program.
+   * Invoke this method to submit the map/reduce job.
+   * @throws IOException When there is communication problems with the 
+   *                     job tracker.
+   */
+  @SuppressWarnings("unchecked")
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    JobClient client = new JobClient(conf);
+    ClusterStatus cluster = client.getClusterStatus();
+    int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
+    String join_reduces = conf.get(REDUCES_PER_HOST);
+    if (join_reduces != null) {
+       num_reduces = cluster.getTaskTrackers() * 
+                       Integer.parseInt(join_reduces);
+    }
+    Job job = new Job(conf);
+    job.setJobName("join");
+    job.setJarByClass(Sort.class);
+
+    job.setMapperClass(Mapper.class);        
+    job.setReducerClass(Reducer.class);
+
+    Class<? extends InputFormat> inputFormatClass = 
+      SequenceFileInputFormat.class;
+    Class<? extends OutputFormat> outputFormatClass = 
+      SequenceFileOutputFormat.class;
+    Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
+    Class<? extends Writable> outputValueClass = TupleWritable.class;
+    String op = "inner";
+    List<String> otherArgs = new ArrayList<String>();
+    for(int i=0; i < args.length; ++i) {
+      try {
+        if ("-r".equals(args[i])) {
+          num_reduces = Integer.parseInt(args[++i]);
+        } else if ("-inFormat".equals(args[i])) {
+          inputFormatClass = 
+            Class.forName(args[++i]).asSubclass(InputFormat.class);
+        } else if ("-outFormat".equals(args[i])) {
+          outputFormatClass = 
+            Class.forName(args[++i]).asSubclass(OutputFormat.class);
+        } else if ("-outKey".equals(args[i])) {
+          outputKeyClass = 
+            Class.forName(args[++i]).asSubclass(WritableComparable.class);
+        } else if ("-outValue".equals(args[i])) {
+          outputValueClass = 
+            Class.forName(args[++i]).asSubclass(Writable.class);
+        } else if ("-joinOp".equals(args[i])) {
+          op = args[++i];
+        } else {
+          otherArgs.add(args[i]);
+        }
+      } catch (NumberFormatException except) {
+        System.out.println("ERROR: Integer expected instead of " + args[i]);
+        return printUsage();
+      } catch (ArrayIndexOutOfBoundsException except) {
+        System.out.println("ERROR: Required parameter missing from " +
+            args[i-1]);
+        return printUsage(); // exits
+      }
+    }
+
+    // Set user-supplied (possibly default) job configs
+    job.setNumReduceTasks(num_reduces);
+
+    if (otherArgs.size() < 2) {
+      System.out.println("ERROR: Wrong number of parameters: ");
+      return printUsage();
+    }
+
+    FileOutputFormat.setOutputPath(job, 
+      new Path(otherArgs.remove(otherArgs.size() - 1)));
+    List<Path> plist = new ArrayList<Path>(otherArgs.size());
+    for (String s : otherArgs) {
+      plist.add(new Path(s));
+    }
+
+    job.setInputFormatClass(CompositeInputFormat.class);
+    job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, 
+      CompositeInputFormat.compose(op, inputFormatClass,
+      plist.toArray(new Path[0])));
+    job.setOutputFormatClass(outputFormatClass);
+
+    job.setOutputKeyClass(outputKeyClass);
+    job.setOutputValueClass(outputValueClass);
+
+    Date startTime = new Date();
+    System.out.println("Job started: " + startTime);
+    int ret = job.waitForCompletion(true) ? 0 : 1 ;
+    Date end_time = new Date();
+    System.out.println("Job ended: " + end_time);
+    System.out.println("The job took " + 
+        (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+    return ret;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new Join(), args);
+    System.exit(res);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
new file mode 100644
index 0000000..b22a4a8
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -0,0 +1,757 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.ClassUtil;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+
+/**
+ * Dummy class for testing MR framefork. Sleeps for a defined period
+ * of time in mapper and reducer. Generates fake input for map / reduce
+ * jobs. Note that generated number of input pairs is in the order
+ * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
+ * some disk space.
+ */
+public class MRRSleepJob extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(MRRSleepJob.class);
+
+  public static final String MAP_SLEEP_COUNT = "mrr.sleepjob.map.sleep.count";
+  public static final String REDUCE_SLEEP_COUNT =
+    "mrr.sleepjob.reduce.sleep.count";
+  public static final String MAP_SLEEP_TIME = "mrr.sleepjob.map.sleep.time";
+  public static final String REDUCE_SLEEP_TIME =
+    "mrr.sleepjob.reduce.sleep.time";
+  public static final String IREDUCE_SLEEP_COUNT =
+      "mrr.sleepjob.ireduce.sleep.count";
+  public static final String IREDUCE_SLEEP_TIME =
+      "mrr.sleepjob.ireduce.sleep.time";
+  public static final String IREDUCE_STAGES_COUNT =
+      "mrr.sleepjob.ireduces.stages.count";
+  public static final String IREDUCE_TASKS_COUNT =
+      "mrr.sleepjob.ireduces.tasks.count";
+
+  // Flags to inject failures
+  public static final String MAP_THROW_ERROR = "mrr.sleepjob.map.throw.error";
+  public static final String MAP_FATAL_ERROR = "mrr.sleepjob.map.fatal.error";
+  public static final String MAP_ERROR_TASK_IDS =
+      "mrr.sleepjob.map.error.task.ids";
+
+  public static class MRRSleepJobPartitioner extends
+      Partitioner<IntWritable, IntWritable> {
+    public int getPartition(IntWritable k, IntWritable v, int numPartitions) {
+      return k.get() % numPartitions;
+    }
+  }
+
+  public static class EmptySplit extends InputSplit implements Writable {
+    public void write(DataOutput out) throws IOException { }
+    public void readFields(DataInput in) throws IOException { }
+    public long getLength() { return 0L; }
+    public String[] getLocations() { return new String[0]; }
+  }
+
+  public static class SleepInputFormat
+      extends InputFormat<IntWritable,IntWritable> {
+
+    public List<InputSplit> getSplits(JobContext jobContext) {
+      List<InputSplit> ret = new ArrayList<InputSplit>();
+      int numSplits = jobContext.getConfiguration().
+                        getInt(MRJobConfig.NUM_MAPS, 1);
+      for (int i = 0; i < numSplits; ++i) {
+        ret.add(new EmptySplit());
+      }
+      return ret;
+    }
+
+    public RecordReader<IntWritable,IntWritable> createRecordReader(
+        InputSplit ignored, TaskAttemptContext taskContext)
+        throws IOException {
+      Configuration conf = taskContext.getConfiguration();
+
+      final int count = conf.getInt(MAP_SLEEP_COUNT, 1);
+      if (count < 0) {
+        throw new IOException("Invalid map count: " + count);
+      }
+
+      int totalIReduces = conf.getInt(IREDUCE_STAGES_COUNT, 1);
+
+      int reduceTasks = totalIReduces == 0?
+          taskContext.getNumReduceTasks() :
+            conf.getInt(IREDUCE_TASKS_COUNT, 1);
+      int sleepCount = totalIReduces == 0?
+          conf.getInt(REDUCE_SLEEP_COUNT,1) :
+            conf.getInt(IREDUCE_SLEEP_COUNT,1);
+      final int emitPerMapTask = sleepCount * reduceTasks;
+
+      return new RecordReader<IntWritable,IntWritable>() {
+        private int records = 0;
+        private int emitCount = 0;
+        private IntWritable key = null;
+        private IntWritable value = null;
+
+        public void initialize(InputSplit split, TaskAttemptContext context) {
+        }
+
+        public boolean nextKeyValue()
+            throws IOException {
+          if (count == 0) {
+            return false;
+          }
+          key = new IntWritable();
+          key.set(emitCount);
+          int emit = emitPerMapTask / count;
+          if ((emitPerMapTask) % count > records) {
+            ++emit;
+          }
+          emitCount += emit;
+          value = new IntWritable();
+          value.set(emit);
+          return records++ < count;
+        }
+        public IntWritable getCurrentKey() { return key; }
+        public IntWritable getCurrentValue() { return value; }
+        public void close() throws IOException { }
+        public float getProgress() throws IOException {
+          return count == 0 ? 100 : records / ((float)count);
+        }
+      };
+    }
+  }
+
+  public static class SleepMapper
+      extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
+    private long mapSleepDuration = 100;
+    private int mapSleepCount = 1;
+    private int count = 0;
+    private String vertexName;
+    private boolean throwError = false;
+    private boolean throwFatal = false;
+    private boolean finalAttempt = false;
+
+    protected void setup(Context context)
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.mapSleepCount =
+        conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
+      this.mapSleepDuration = mapSleepCount == 0 ? 0 :
+        conf.getLong(MAP_SLEEP_TIME , 100) / mapSleepCount;
+      vertexName = conf.get(
+          org.apache.tez.mapreduce.hadoop.MRJobConfig.VERTEX_NAME);
+
+      TaskAttemptID taId = context.getTaskAttemptID();
+
+      String[] taskIds = conf.getStrings(MAP_ERROR_TASK_IDS);
+      if (taId.getId()+1 >= context.getMaxMapAttempts()) {
+        finalAttempt = true;
+      }
+      boolean found = false;
+      if (taskIds != null) {
+        if (taskIds.length == 1 && taskIds[0].equals("*")) {
+          found = true;
+        }
+        if (!found) {
+          for (String taskId : taskIds) {
+            if (Integer.valueOf(taskId).intValue() ==
+                taId.getTaskID().getId()) {
+              found = true;
+              break;
+            }
+          }
+        }
+      }
+      if (found) {
+        if (!finalAttempt) {
+          throwError = conf.getBoolean(MAP_THROW_ERROR, false);
+        }
+        throwFatal = conf.getBoolean(MAP_FATAL_ERROR, false);
+      }
+    }
+
+    public void map(IntWritable key, IntWritable value, Context context
+               ) throws IOException, InterruptedException {
+      //it is expected that every map processes mapSleepCount number of records.
+      try {
+        LOG.info("Reading in " + vertexName
+            + " taskid " + context.getTaskAttemptID().getTaskID().getId()
+            + " key " + key.get());
+        LOG.info("Sleeping in InitialMap"
+            + ", vertexName=" + vertexName
+            + ", taskAttemptId=" + context.getTaskAttemptID()
+            + ", mapSleepDuration=" + mapSleepDuration
+            + ", mapSleepCount=" + mapSleepCount
+            + ", sleepLeft="
+            + (mapSleepDuration * (mapSleepCount - count)));
+        context.setStatus("Sleeping... (" +
+          (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+        if ((mapSleepCount - count) > 0) {
+          Thread.sleep(mapSleepDuration);
+        }
+        if (throwError || throwFatal) {
+          throw new IOException("Throwing a simulated error from map");
+        }
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+            "Interrupted while sleeping").initCause(ex);
+      }
+      ++count;
+      // output reduceSleepCount * numReduce number of random values, so that
+      // each reducer will get reduceSleepCount number of keys.
+      int k = key.get();
+      for (int i = 0; i < value.get(); ++i) {
+        LOG.info("Writing in " + vertexName
+            + " taskid " + context.getTaskAttemptID().getTaskID().getId()
+            + " key " + (k+i) + " value 1");
+        context.write(new IntWritable(k + i), new IntWritable(1));
+      }
+    }
+  }
+
+  public static class ISleepReducer
+  extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+    private long iReduceSleepDuration = 100;
+    private int iReduceSleepCount = 1;
+    private int count = 0;
+    private String vertexName;
+
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.iReduceSleepCount =
+          conf.getInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
+      this.iReduceSleepDuration = iReduceSleepCount == 0 ? 0 :
+        conf.getLong(IREDUCE_SLEEP_TIME , 100) / iReduceSleepCount;
+      vertexName = conf.get(
+          org.apache.tez.mapreduce.hadoop.MRJobConfig.VERTEX_NAME);
+    }
+
+    public void reduce(IntWritable key, Iterable<IntWritable> values,
+        Context context)
+            throws IOException, InterruptedException {
+      try {
+        LOG.info("Reading in " + vertexName
+            + " taskid " + context.getTaskAttemptID().getTaskID().getId()
+            + " key " + key.get());
+
+        LOG.info("Sleeping in IntermediateReduce"
+            + ", vertexName=" + vertexName
+            + ", taskAttemptId=" + context.getTaskAttemptID()
+            + ", iReduceSleepDuration=" + iReduceSleepDuration
+            + ", iReduceSleepCount=" + iReduceSleepCount
+            + ", sleepLeft="
+            + (iReduceSleepDuration * (iReduceSleepCount - count)));
+        context.setStatus("Sleeping... (" +
+          (iReduceSleepDuration * (iReduceSleepCount - count)) + ") ms left");
+        if ((iReduceSleepCount - count) > 0) {
+          Thread.sleep(iReduceSleepDuration);
+        }
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+            "Interrupted while sleeping").initCause(ex);
+      }
+      ++count;
+      // output reduceSleepCount * numReduce number of random values, so that
+      // each reducer will get reduceSleepCount number of keys.
+      int k = key.get();
+      for (IntWritable value : values) {
+        for (int i = 0; i < value.get(); ++i) {
+          LOG.info("Writing in " + vertexName
+              + " taskid " + context.getTaskAttemptID().getTaskID().getId()
+              + " key " + (k+i) + " value 1");
+          context.write(new IntWritable(k + i), new IntWritable(1));
+        }
+      }
+    }
+  }
+
+  public static class SleepReducer
+      extends Reducer<IntWritable, IntWritable, NullWritable, NullWritable> {
+    private long reduceSleepDuration = 100;
+    private int reduceSleepCount = 1;
+    private int count = 0;
+    private String vertexName;
+
+    protected void setup(Context context)
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.reduceSleepCount =
+        conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+      this.reduceSleepDuration = reduceSleepCount == 0 ? 0 :
+        conf.getLong(REDUCE_SLEEP_TIME , 100) / reduceSleepCount;
+      vertexName = conf.get(
+          org.apache.tez.mapreduce.hadoop.MRJobConfig.VERTEX_NAME);
+    }
+
+    public void reduce(IntWritable key, Iterable<IntWritable> values,
+                       Context context)
+      throws IOException {
+      try {
+        LOG.info("Reading in " + vertexName
+            + " taskid " + context.getTaskAttemptID().getTaskID().getId()
+            + " key " + key.get());
+        LOG.info("Sleeping in FinalReduce"
+            + ", vertexName=" + vertexName
+            + ", taskAttemptId=" + context.getTaskAttemptID()
+            + ", reduceSleepDuration=" + reduceSleepDuration
+            + ", reduceSleepCount=" + reduceSleepCount
+            + ", sleepLeft="
+            + (reduceSleepDuration * (reduceSleepCount - count)));
+        context.setStatus("Sleeping... (" +
+            (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+        if ((reduceSleepCount - count) > 0) {
+          Thread.sleep(reduceSleepDuration);
+        }
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+          "Interrupted while sleeping").initCause(ex);
+      }
+      count++;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new MRRSleepJob(), args);
+    System.exit(res);
+  }
+  
+  private Credentials credentials = new Credentials();
+
+  public DAG createDAG(FileSystem remoteFs, Configuration conf, Path remoteStagingDir,
+      int numMapper, int numReducer, int iReduceStagesCount,
+      int numIReducer, long mapSleepTime, int mapSleepCount,
+      long reduceSleepTime, int reduceSleepCount,
+      long iReduceSleepTime, int iReduceSleepCount, boolean writeSplitsToDFS,
+      boolean generateSplitsInAM)
+      throws IOException, YarnException {
+
+
+    Configuration mapStageConf = new JobConf(conf);
+    mapStageConf.setInt(MRJobConfig.NUM_MAPS, numMapper);
+    mapStageConf.setLong(MAP_SLEEP_TIME, mapSleepTime);
+    mapStageConf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
+    mapStageConf.setLong(IREDUCE_SLEEP_TIME, iReduceSleepTime);
+    mapStageConf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
+    mapStageConf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+    mapStageConf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
+    mapStageConf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
+    mapStageConf.setInt(IREDUCE_TASKS_COUNT, numIReducer);
+    mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR, SleepMapper.class.getName());
+    mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+        SleepInputFormat.class.getName());
+    if (numIReducer == 0 && numReducer == 0) {
+      mapStageConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+          NullOutputFormat.class.getName());
+    }
+
+    MRHelpers.translateMRConfToTez(mapStageConf);
+
+    Configuration[] intermediateReduceStageConfs = null;
+    if (iReduceStagesCount > 0
+        && numIReducer > 0) {
+      intermediateReduceStageConfs = new JobConf[iReduceStagesCount];
+      for (int i = 1; i <= iReduceStagesCount; ++i) {
+        JobConf iReduceStageConf = new JobConf(conf);
+        iReduceStageConf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, iReduceSleepTime);
+        iReduceStageConf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, iReduceSleepCount);
+        iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, numIReducer);
+        iReduceStageConf
+            .set(MRJobConfig.REDUCE_CLASS_ATTR, ISleepReducer.class.getName());
+        iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+            IntWritable.class.getName());
+        iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+            IntWritable.class.getName());
+        iReduceStageConf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+            MRRSleepJobPartitioner.class.getName());
+
+
+        MRHelpers.translateMRConfToTez(iReduceStageConf);
+        intermediateReduceStageConfs[i-1] = iReduceStageConf;
+      }
+    }
+
+    Configuration finalReduceConf = null;
+    if (numReducer > 0) {
+      finalReduceConf = new JobConf(conf);
+      finalReduceConf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, reduceSleepTime);
+      finalReduceConf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, reduceSleepCount);
+      finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, numReducer);
+      finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR, SleepReducer.class.getName());
+      finalReduceConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+          IntWritable.class.getName());
+      finalReduceConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+          IntWritable.class.getName());
+      finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+          NullOutputFormat.class.getName());
+
+      MRHelpers.translateMRConfToTez(finalReduceConf);
+    }
+
+    MRHelpers.configureMRApiUsage(mapStageConf);
+    if (iReduceStagesCount > 0
+        && numIReducer > 0) {
+      for (int i = 0; i < iReduceStagesCount; ++i) {
+        MRHelpers.configureMRApiUsage(intermediateReduceStageConfs[i]);
+      }
+    }
+    if (numReducer > 0) {
+      MRHelpers.configureMRApiUsage(finalReduceConf);
+    }
+
+    DataSourceDescriptor dataSource = null;
+    if (!generateSplitsInAM && writeSplitsToDFS) {
+
+      LOG.info("Writing splits to DFS");
+      dataSource = MRInputHelpers
+          .configureMRInputWithLegacySplitGeneration(mapStageConf, remoteStagingDir, true);
+    } else {
+      dataSource = MRInputLegacy.createConfigurer(mapStageConf, SleepInputFormat.class)
+          .generateSplitsInAM(generateSplitsInAM).create();
+    }
+
+    DAG dag = new DAG("MRRSleepJob");
+    String jarPath = ClassUtil.findContainingJar(getClass());
+    if (jarPath == null)  {
+        throw new TezUncheckedException("Could not find any jar containing"
+            + " MRRSleepJob.class in the classpath");
+    }
+    Path remoteJarPath = remoteFs.makeQualified(
+        new Path(remoteStagingDir, "dag_job.jar"));
+    remoteFs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
+    FileStatus jarFileStatus = remoteFs.getFileStatus(remoteJarPath);
+    
+    TokenCache.obtainTokensForNamenodes(this.credentials, new Path[] { remoteJarPath },
+        mapStageConf);
+
+    Map<String, LocalResource> commonLocalResources =
+        new HashMap<String, LocalResource>();
+    LocalResource dagJarLocalRsrc = LocalResource.newInstance(
+        ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+        LocalResourceType.FILE,
+        LocalResourceVisibility.APPLICATION,
+        jarFileStatus.getLen(),
+        jarFileStatus.getModificationTime());
+    commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
+
+    List<Vertex> vertices = new ArrayList<Vertex>();
+
+    
+    UserPayload mapUserPayload = TezUtils.createUserPayloadFromConf(mapStageConf);
+    int numTasks = generateSplitsInAM ? -1 : numMapper;
+
+    Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
+        MapProcessor.class.getName()).setUserPayload(mapUserPayload), numTasks)
+        .setTaskLocalFiles(commonLocalResources);
+    mapVertex.addDataSource("MRInput", dataSource);
+    vertices.add(mapVertex);
+
+    if (iReduceStagesCount > 0
+        && numIReducer > 0) {
+      for (int i = 0; i < iReduceStagesCount; ++i) {
+        Configuration iconf =
+            intermediateReduceStageConfs[i];
+        UserPayload iReduceUserPayload = TezUtils.createUserPayloadFromConf(iconf);
+        Vertex ivertex = new Vertex("ireduce" + (i+1),
+                new ProcessorDescriptor(ReduceProcessor.class.getName()).
+                setUserPayload(iReduceUserPayload), numIReducer);
+        ivertex.setTaskLocalFiles(commonLocalResources);
+        vertices.add(ivertex);
+      }
+    }
+
+    Vertex finalReduceVertex = null;
+    if (numReducer > 0) {
+      UserPayload reducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
+      finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
+          ReduceProcessor.class.getName()).setUserPayload(reducePayload), numReducer);
+      finalReduceVertex.setTaskLocalFiles(commonLocalResources);
+      finalReduceVertex.addDataSink("MROutput", MROutputLegacy.createConfigurer(finalReduceConf,
+          NullOutputFormat.class).create());
+      vertices.add(finalReduceVertex);
+    } else {
+      // Map only job
+      mapVertex.addDataSink("MROutput",
+          MROutputLegacy.createConfigurer(mapStageConf, NullOutputFormat.class).create());
+    }
+
+
+    Map<String, String> partitionerConf = Maps.newHashMap();
+    partitionerConf.put(MRJobConfig.PARTITIONER_CLASS_ATTR, MRRSleepJobPartitioner.class.getName());
+    OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
+        .newBuilder(IntWritable.class.getName(), IntWritable.class.getName(),
+            HashPartitioner.class.getName(), partitionerConf).configureInput().useLegacyInput()
+        .done().build();
+
+    for (int i = 0; i < vertices.size(); ++i) {
+      dag.addVertex(vertices.get(i));
+      if (i != 0) {
+        dag.addEdge(
+            new Edge(vertices.get(i - 1), vertices.get(i), edgeConf.createDefaultEdgeProperty()));
+      }
+    }
+
+    return dag;
+  }
+
+  @VisibleForTesting
+  public Job createJob(int numMapper, int numReducer, int iReduceStagesCount,
+      int numIReducer, long mapSleepTime, int mapSleepCount,
+      long reduceSleepTime, int reduceSleepCount,
+      long iReduceSleepTime, int iReduceSleepCount)
+          throws IOException {
+    Configuration conf = getConf();
+    conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
+    conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
+    conf.setLong(IREDUCE_SLEEP_TIME, iReduceSleepTime);
+    conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
+    conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+    conf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
+    conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
+    conf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
+    conf.setInt(IREDUCE_TASKS_COUNT, numIReducer);
+
+    // Configure intermediate reduces
+    conf.setInt(
+        org.apache.tez.mapreduce.hadoop.MRJobConfig.MRR_INTERMEDIATE_STAGES,
+        iReduceStagesCount);
+    LOG.info("Running MRR with " + iReduceStagesCount + " IR stages");
+
+    for (int i = 1; i <= iReduceStagesCount; ++i) {
+      // Set reducer class for intermediate reduce
+      conf.setClass(
+          MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
+              "mapreduce.job.reduce.class"), ISleepReducer.class, Reducer.class);
+      // Set reducer output key class
+      conf.setClass(
+          MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
+              "mapreduce.map.output.key.class"), IntWritable.class, Object.class);
+      // Set reducer output value class
+      conf.setClass(
+          MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
+              "mapreduce.map.output.value.class"), IntWritable.class, Object.class);
+      conf.setInt(
+          MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
+              "mapreduce.job.reduces"), numIReducer);
+    }
+
+    Job job = Job.getInstance(conf, "sleep");
+    job.setNumReduceTasks(numReducer);
+    job.setJarByClass(MRRSleepJob.class);
+    job.setNumReduceTasks(numReducer);
+    job.setMapperClass(SleepMapper.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(IntWritable.class);
+    job.setReducerClass(SleepReducer.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setInputFormatClass(SleepInputFormat.class);
+    job.setPartitionerClass(MRRSleepJobPartitioner.class);
+    job.setSpeculativeExecution(false);
+    job.setJobName("Sleep job");
+
+    FileInputFormat.addInputPath(job, new Path("ignored"));
+    return job;
+  }
+
+  public int run(String[] args) throws Exception {
+
+    if(args.length < 1) {
+      System.err.println("MRRSleepJob [-m numMapper] [-r numReducer]" +
+          " [-ir numIntermediateReducer]" +
+          " [-irs numIntermediateReducerStages]" +
+          " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
+          " [-irt intermediateReduceSleepTime]" +
+          " [-recordt recordSleepTime (msec)]" +
+          " [-generateSplitsInAM (false)/true]" +
+          " [-writeSplitsToDfs (false)/true]");
+      ToolRunner.printGenericCommandUsage(System.err);
+      return 2;
+    }
+
+    int numMapper = 1, numReducer = 1, numIReducer = 1;
+    long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100,
+        iReduceSleepTime=1;
+    int mapSleepCount = 1, reduceSleepCount = 1, iReduceSleepCount = 1;
+    int iReduceStagesCount = 1;
+    boolean writeSplitsToDfs = false;
+    boolean generateSplitsInAM = false;
+    boolean splitsOptionFound = false;
+
+    for(int i=0; i < args.length; i++ ) {
+      if(args[i].equals("-m")) {
+        numMapper = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-r")) {
+        numReducer = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-ir")) {
+        numIReducer = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-mt")) {
+        mapSleepTime = Long.parseLong(args[++i]);
+      }
+      else if(args[i].equals("-rt")) {
+        reduceSleepTime = Long.parseLong(args[++i]);
+      }
+      else if(args[i].equals("-irt")) {
+        iReduceSleepTime = Long.parseLong(args[++i]);
+      }
+      else if(args[i].equals("-irs")) {
+        iReduceStagesCount = Integer.parseInt(args[++i]);
+      }
+      else if (args[i].equals("-recordt")) {
+        recSleepTime = Long.parseLong(args[++i]);
+      }
+      else if (args[i].equals("-generateSplitsInAM")) {
+        if (splitsOptionFound) {
+          throw new RuntimeException("Cannot use both -generateSplitsInAm and -writeSplitsToDfs together");
+        }
+        splitsOptionFound = true;
+        generateSplitsInAM = Boolean.parseBoolean(args[++i]);
+        
+      }
+      else if (args[i].equals("-writeSplitsToDfs")) {
+        if (splitsOptionFound) {
+          throw new RuntimeException("Cannot use both -generateSplitsInAm and -writeSplitsToDfs together");
+        }
+        splitsOptionFound = true;
+        writeSplitsToDfs = Boolean.parseBoolean(args[++i]);
+      }
+    }
+
+    if (numIReducer > 0 && numReducer <= 0) {
+      throw new RuntimeException("Cannot have intermediate reduces without"
+          + " a final reduce");
+    }
+
+    // sleep for *SleepTime duration in Task by recSleepTime per record
+    mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
+    reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
+    iReduceSleepCount = (int)Math.ceil(iReduceSleepTime / ((double)recSleepTime));
+
+    TezConfiguration conf = new TezConfiguration(getConf());
+    FileSystem remoteFs = FileSystem.get(conf);
+
+    conf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        conf.get(
+            TezConfiguration.TEZ_AM_STAGING_DIR,
+            TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT));
+    
+    Path remoteStagingDir =
+        remoteFs.makeQualified(new Path(conf.get(
+            TezConfiguration.TEZ_AM_STAGING_DIR,
+            TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT),
+            Long.toString(System.currentTimeMillis())));
+    TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+    DAG dag = createDAG(remoteFs, conf, remoteStagingDir,
+        numMapper, numReducer, iReduceStagesCount, numIReducer,
+        mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
+        iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs, generateSplitsInAM);
+
+    TezClient tezSession = new TezClient("MRRSleep", conf, false, null, credentials);
+    tezSession.start();
+    DAGClient dagClient = tezSession.submitDAG(dag);
+
+    while (true) {
+      DAGStatus status = dagClient.getDAGStatus(null);
+      LOG.info("DAG Status: " + status);
+      if (status.isCompleted()) {
+        break;
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // do nothing
+      }
+    }
+    tezSession.stop();
+
+    return dagClient.getDAGStatus(null).getState().equals(DAGStatus.State.SUCCEEDED) ? 0 : 1;
+  }
+
+}


Mime
View raw message