tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [09/10] TEZ-1055. Rename tez-mapreduce-examples to tez-examples (Hitesh Shah via bikas)
Date Sat, 16 Aug 2014 00:54:57 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
deleted file mode 100644
index 35a7c76..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
-import org.apache.tez.runtime.api.ObjectRegistry;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer;
-import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
-import org.apache.tez.runtime.library.processor.SimpleProcessor;
-
-import com.google.common.base.Preconditions;
-
-public class BroadcastAndOneToOneExample extends Configured implements Tool {
-  public static class InputProcessor extends SimpleProcessor {
-    Text word = new Text();
-
-    public InputProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void run() throws Exception {
-      Preconditions.checkArgument(getOutputs().size() == 1);
-      OnFileUnorderedKVOutput output = (OnFileUnorderedKVOutput) getOutputs().values().iterator()
-          .next();
-      KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
-      kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
-      byte[] userPayload = getContext().getUserPayload().getPayload();
-      if (userPayload != null) {
-        boolean doLocalityCheck = userPayload[0] > 0 ? true : false;
-        if (doLocalityCheck) {
-          ObjectRegistry objectRegistry = getContext().getObjectRegistry();
-          String entry = String.valueOf(getContext().getTaskIndex());
-          objectRegistry.cacheForDAG(entry, entry);
-        }
-      }
-    }
-  }
-
-  public static class OneToOneProcessor extends SimpleProcessor {
-    Text word = new Text();
-
-    public OneToOneProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void run() throws Exception {
-      Preconditions.checkArgument(inputs.size() == 2);
-
-      KeyValueReader inputKvReader = (KeyValueReader) getInputs().get("Input").getReader();
-      KeyValueReader broadcastKvReader = (KeyValueReader) getInputs().get("Broadcast").getReader();
-      int sum = 0;
-      while (broadcastKvReader.next()) {
-        sum += ((IntWritable) broadcastKvReader.getCurrentValue()).get();
-      }
-      while (inputKvReader.next()) {
-        sum += ((IntWritable) inputKvReader.getCurrentValue()).get();
-      }
-      boolean doLocalityCheck = getContext().getUserPayload().getPayload()[0] > 0 ? true : false;
-      int broadcastSum = getContext().getUserPayload().getPayload()[1];
-      int expectedSum = broadcastSum + getContext().getTaskIndex();
-      System.out.println("Index: " + getContext().getTaskIndex() + 
-          " sum: " + sum + " expectedSum: " + expectedSum + " broadcastSum: " + broadcastSum);
-      Preconditions.checkState((sum == expectedSum), "Sum = " + sum);      
-      
-      if (doLocalityCheck) {
-        ObjectRegistry objectRegistry = getContext().getObjectRegistry();
-        String index = (String) objectRegistry.get(String.valueOf(getContext().getTaskIndex()));
-        if (index == null || Integer.valueOf(index).intValue() != getContext().getTaskIndex()) {
-          String msg = "Did not find expected local producer "
-              + getContext().getTaskIndex() + " in the same JVM";
-          System.out.println(msg);
-          throw new TezUncheckedException(msg);
-        }
-      }
-    }
-
-  }
-
-  private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
-      Path stagingDir, boolean doLocalityCheck) throws IOException, YarnException {
-
-    int numBroadcastTasks = 2;
-    int numOneToOneTasks = 3;
-    if (doLocalityCheck) {
-      YarnClient yarnClient = YarnClient.createYarnClient();
-      yarnClient.init(tezConf);
-      yarnClient.start();
-      int numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size();
-      yarnClient.stop();
-      // create enough 1-1 tasks to run in parallel
-      numOneToOneTasks = numNMs - numBroadcastTasks - 1;// 1 AM
-      if (numOneToOneTasks < 1) {
-        numOneToOneTasks = 1;
-      }
-    }
-    byte[] procByte = {(byte) (doLocalityCheck ? 1 : 0), 1};
-    UserPayload procPayload = new UserPayload(procByte);
-
-    System.out.println("Using " + numOneToOneTasks + " 1-1 tasks");
-
-    Vertex broadcastVertex = new Vertex("Broadcast", new ProcessorDescriptor(
-        InputProcessor.class.getName()), numBroadcastTasks);
-    
-    Vertex inputVertex = new Vertex("Input", new ProcessorDescriptor(
-        InputProcessor.class.getName()).setUserPayload(procPayload), numOneToOneTasks);
-
-    Vertex oneToOneVertex = new Vertex("OneToOne",
-        new ProcessorDescriptor(
-            OneToOneProcessor.class.getName()).setUserPayload(procPayload));
-    oneToOneVertex.setVertexManagerPlugin(
-            new VertexManagerPluginDescriptor(InputReadyVertexManager.class.getName()));
-
-    UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer
-        .newBuilder(Text.class.getName(), IntWritable.class.getName()).build();
-
-    DAG dag = new DAG("BroadcastAndOneToOneExample");
-    dag.addVertex(inputVertex)
-        .addVertex(broadcastVertex)
-        .addVertex(oneToOneVertex)
-        .addEdge(
-            new Edge(inputVertex, oneToOneVertex, edgeConf.createDefaultOneToOneEdgeProperty()))
-        .addEdge(
-            new Edge(broadcastVertex, oneToOneVertex,
-                edgeConf.createDefaultBroadcastEdgeProperty()));
-    return dag;
-  }
-  
-  public boolean run(Configuration conf, boolean doLocalityCheck) throws Exception {
-    System.out.println("Running BroadcastAndOneToOneExample");
-    // conf and UGI
-    TezConfiguration tezConf;
-    if (conf != null) {
-      tezConf = new TezConfiguration(conf);
-    } else {
-      tezConf = new TezConfiguration();
-    }
-    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
-    UserGroupInformation.setConfiguration(tezConf);
-    String user = UserGroupInformation.getCurrentUser().getShortUserName();
-
-    // staging dir
-    FileSystem fs = FileSystem.get(tezConf);
-    String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
-        + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
-        + Path.SEPARATOR + Long.toString(System.currentTimeMillis());    
-    Path stagingDir = new Path(stagingDirStr);
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
-    stagingDir = fs.makeQualified(stagingDir);
-    
-    // No need to add jar containing this class as assumed to be part of
-    // the tez jars.
-
-    // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
-    // is the same filesystem as the one used for Input/Output.
-    TezClient tezSession = null;
-    // needs session or else TaskScheduler does not hold onto containers
-    tezSession = new TezClient("broadcastAndOneToOneExample", tezConf);
-    tezSession.start();
-
-    DAGClient dagClient = null;
-
-    try {
-        DAG dag = createDAG(fs, tezConf, stagingDir, doLocalityCheck);
-
-        tezSession.waitTillReady();
-        dagClient = tezSession.submitDAG(dag);
-
-        // monitoring
-        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
-        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-          System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
-          return false;
-        }
-        return true;
-    } finally {
-      fs.delete(stagingDir, true);
-      tezSession.stop();
-    }
-  }
-  
-  @Override
-  public int run(String[] args) throws Exception {
-    boolean doLocalityCheck = true;
-    if (args.length == 1) {
-      if (args[0].equals(skipLocalityCheck)) {
-        doLocalityCheck = false;
-      } else {
-        printUsage();
-        throw new TezException("Invalid command line");
-      }
-    } else if (args.length > 1) {
-      printUsage();
-      throw new TezException("Invalid command line");
-    }
-    boolean status = run(getConf(), doLocalityCheck);
-    return status ? 0 : 1;
-  }
-  
-  private static void printUsage() {
-    System.err.println("broadcastAndOneToOneExample " + skipLocalityCheck);
-    ToolRunner.printGenericCommandUsage(System.err);
-  }
-  
-  static String skipLocalityCheck = "-skipLocalityCheck";
-
-  public static void main(String[] args) throws Exception {
-    Configuration conf = new Configuration();
-    BroadcastAndOneToOneExample job = new BroadcastAndOneToOneExample();
-    int status = ToolRunner.run(conf, job, args);
-    System.exit(status);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
deleted file mode 100644
index f4f617a..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.EnumSet;
-import java.util.Set;
-
-import org.apache.hadoop.util.ProgramDriver;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.Progress;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.dag.api.client.VertexStatus;
-import org.apache.tez.examples.OrderedWordCount;
-import org.apache.tez.examples.SimpleSessionExample;
-import org.apache.tez.examples.WordCount;
-import org.apache.tez.mapreduce.examples.terasort.TeraGen;
-import org.apache.tez.mapreduce.examples.terasort.TeraSort;
-import org.apache.tez.mapreduce.examples.terasort.TeraValidate;
-
-/**
- * A description of an example program based on its class and a
- * human-readable description.
- */
-public class ExampleDriver {
-
-  private static final DecimalFormat formatter = new DecimalFormat("###.##%");
-
-  public static void main(String argv[]){
-    int exitCode = -1;
-    ProgramDriver pgd = new ProgramDriver();
-    try {
-      pgd.addClass("wordcount", WordCount.class,
-          "A map/reduce program that counts the words in the input files.");
-      pgd.addClass("mapredwordcount", MapredWordCount.class,
-          "A map/reduce program that counts the words in the input files"
-         + " using the mapred apis.");
-      pgd.addClass("randomwriter", RandomWriter.class,
-          "A map/reduce program that writes 10GB of random data per node.");
-      pgd.addClass("randomtextwriter", RandomTextWriter.class,
-      "A map/reduce program that writes 10GB of random textual data per node.");
-      pgd.addClass("sort", Sort.class,
-          "A map/reduce program that sorts the data written by the random"
-          + " writer.");
-      pgd.addClass("secondarysort", SecondarySort.class,
-          "An example defining a secondary sort to the reduce.");
-      pgd.addClass("join", Join.class,
-          "A job that effects a join over sorted, equally partitioned"
-          + " datasets");
-      pgd.addClass("teragen", TeraGen.class,
-          "Generate data for the terasort");
-      pgd.addClass("terasort", TeraSort.class,
-          "Run the terasort");
-      pgd.addClass("teravalidate", TeraValidate.class,
-          "Checking results of terasort");
-      pgd.addClass("groupbyorderbymrrtest", GroupByOrderByMRRTest.class,
-          "A map-reduce-reduce program that does groupby-order by. Takes input"
-          + " containing employee_name department name per line of input"
-          + " and generates count of employees per department and"
-          + " sorted on employee count");
-      pgd.addClass("mrrsleep", MRRSleepJob.class,
-          "MRR Sleep Job");
-      pgd.addClass("orderedwordcount", OrderedWordCount.class,
-          "Word Count with words sorted on frequency");
-      pgd.addClass("simplesessionexample", SimpleSessionExample.class,
-          "Example to run multiple dags in a session");
-      pgd.addClass("testorderedwordcount", TestOrderedWordCount.class,
-          "Word Count with words sorted on frequency");
-      pgd.addClass("unionexample", UnionExample.class,
-          "Union example");
-      pgd.addClass("broadcastAndOneToOneExample", BroadcastAndOneToOneExample.class,
-          "BroadcastAndOneToOneExample example");
-      pgd.addClass("filterLinesByWord", FilterLinesByWord.class,
-          "Filters lines by the specified word using broadcast edge");
-      pgd.addClass("filterLinesByWordOneToOne", FilterLinesByWordOneToOne.class,
-          "Filters lines by the specified word using OneToOne edge");
-      pgd.addClass("intersect", IntersectExample.class,
-          "Identify all occurences of lines in file1 which also occur in file2");
-      pgd.addClass("intersectdatagen", IntersectDataGen.class,
-          "Generate data to run the intersect example");
-      pgd.addClass("intersectvalidate", IntersectValidate.class,
-          "Validate data generated by intersect and intersectdatagen");
-      exitCode = pgd.run(argv);
-    }
-    catch(Throwable e){
-      e.printStackTrace();
-    }
-
-    System.exit(exitCode);
-  }
-
-  public static void printDAGStatus(DAGClient dagClient, String[] vertexNames)
-      throws IOException, TezException {
-    printDAGStatus(dagClient, vertexNames, false, false);
-  }
-
-  public static void printDAGStatus(DAGClient dagClient, String[] vertexNames,
-      boolean displayDAGCounters, boolean displayVertexCounters)
-      throws IOException, TezException {
-    Set<StatusGetOpts> opts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-    DAGStatus dagStatus = dagClient.getDAGStatus(
-      (displayDAGCounters ? opts : null));
-    Progress progress = dagStatus.getDAGProgress();
-    double vProgressFloat = 0.0f;
-    if (progress != null) {
-      System.out.println("");
-      System.out.println("DAG: State: "
-          + dagStatus.getState()
-          + " Progress: "
-          + (progress.getTotalTaskCount() < 0 ? formatter.format(0.0f) :
-            formatter.format((double)(progress.getSucceededTaskCount())
-              /progress.getTotalTaskCount())));
-      for (String vertexName : vertexNames) {
-        VertexStatus vStatus = dagClient.getVertexStatus(vertexName,
-          (displayVertexCounters ? opts : null));
-        if (vStatus == null) {
-          System.out.println("Could not retrieve status for vertex: "
-            + vertexName);
-          continue;
-        }
-        Progress vProgress = vStatus.getProgress();
-        if (vProgress != null) {
-          vProgressFloat = 0.0f;
-          if (vProgress.getTotalTaskCount() == 0) {
-            vProgressFloat = 1.0f;
-          } else if (vProgress.getTotalTaskCount() > 0) {
-            vProgressFloat = (double)vProgress.getSucceededTaskCount()
-              /vProgress.getTotalTaskCount();
-          }
-          System.out.println("VertexStatus:"
-              + " VertexName: "
-              + (vertexName.equals("ivertex1") ? "intermediate-reducer"
-                  : vertexName)
-              + " Progress: " + formatter.format(vProgressFloat));
-        }
-        if (displayVertexCounters) {
-          TezCounters counters = vStatus.getVertexCounters();
-          if (counters != null) {
-            System.out.println("Vertex Counters for " + vertexName + ": "
-              + counters);
-          }
-        }
-      }
-    }
-    if (displayDAGCounters) {
-      TezCounters counters = dagStatus.getDAGCounters();
-      if (counters != null) {
-        System.out.println("DAG Counters: " + counters);
-      }
-    }
-  }
-
-}
-	

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
deleted file mode 100644
index 02a3202..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.util.ClassUtil;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DataSinkDescriptor;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.OutputCommitterDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.committer.MROutputCommitter;
-import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
-import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
-import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.processor.FilterByWordInputProcessor;
-import org.apache.tez.processor.FilterByWordOutputProcessor;
-import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer;
-
-import com.google.common.collect.Sets;
-
-public class FilterLinesByWord extends Configured implements Tool {
-
-  private static Log LOG = LogFactory.getLog(FilterLinesByWord.class);
-
-  public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
-  
-  private boolean exitOnCompletion = false;
-
-  public FilterLinesByWord(boolean exitOnCompletion) {
-    this.exitOnCompletion = exitOnCompletion;
-  }
-  
-  private static void printUsage() {
-    System.err.println("Usage filtelinesrbyword <in> <out> <filter_word> [-generateSplitsInClient true/<false>]");
-    ToolRunner.printGenericCommandUsage(System.err);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    Credentials credentials = new Credentials();
-
-    boolean generateSplitsInClient = false;
-
-    SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
-    try {
-      generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
-      otherArgs = splitCmdLineParser.getRemainingArgs();
-    } catch (ParseException e1) {
-      System.err.println("Invalid options");
-      printUsage();
-      return 2;
-    }
-
-    if (otherArgs.length != 3) {
-      printUsage();
-      return 2;
-    }
-
-    String inputPath = otherArgs[0];
-    String outputPath = otherArgs[1];
-    String filterWord = otherArgs[2];
-
-    FileSystem fs = FileSystem.get(conf);
-    if (fs.exists(new Path(outputPath))) {
-      System.err.println("Output directory : " + outputPath + " already exists");
-      return 2;
-    }
-
-    TezConfiguration tezConf = new TezConfiguration(conf);
-
-    fs.getWorkingDirectory();
-    Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
-    TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
-
-    String jarPath = ClassUtil.findContainingJar(FilterLinesByWord.class);
-    if (jarPath == null) {
-      throw new TezUncheckedException("Could not find any jar containing"
-          + FilterLinesByWord.class.getName() + " in the classpath");
-    }
-
-    Path remoteJarPath = fs.makeQualified(new Path(stagingDir, "dag_job.jar"));
-    fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
-    FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath);
-    TokenCache.obtainTokensForNamenodes(credentials, new Path[]{remoteJarPath}, conf);
-
-    Map<String, LocalResource> commonLocalResources = new TreeMap<String, LocalResource>();
-    LocalResource dagJarLocalRsrc = LocalResource.newInstance(
-        ConverterUtils.getYarnUrlFromPath(remoteJarPath),
-        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
-        remoteJarStatus.getLen(), remoteJarStatus.getModificationTime());
-    commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
-
-
-
-    TezClient tezSession = new TezClient("FilterLinesByWordSession", tezConf, 
-        commonLocalResources, credentials);
-    tezSession.start(); // Why do I need to start the TezSession.
-
-    Configuration stage1Conf = new JobConf(conf);
-    stage1Conf.set(FILTER_PARAM_NAME, filterWord);
-
-    Configuration stage2Conf = new JobConf(conf);
-    stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
-    stage2Conf.setBoolean("mapred.mapper.new-api", false);
-
-    UserPayload stage1Payload = TezUtils.createUserPayloadFromConf(stage1Conf);
-    // Setup stage1 Vertex
-    Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
-        FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
-        .setTaskLocalFiles(commonLocalResources);
-
-    DataSourceDescriptor dsd;
-    if (generateSplitsInClient) {
-      // TODO TEZ-1406. Dont' use MRInputLegacy
-      stage1Conf.set(FileInputFormat.INPUT_DIR, inputPath);
-      stage1Conf.setBoolean("mapred.mapper.new-api", false);
-      dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(stage1Conf, stagingDir, true);
-    } else {
-      dsd = MRInputLegacy.createConfigurer(stage1Conf, TextInputFormat.class, inputPath)
-          .groupSplits(false).create();
-    }
-    stage1Vertex.addDataSource("MRInput", dsd);
-
-    // Setup stage2 Vertex
-    Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
-        FilterByWordOutputProcessor.class.getName()).setUserPayload(
-        TezUtils.createUserPayloadFromConf(stage2Conf)), 1);
-    stage2Vertex.setTaskLocalFiles(commonLocalResources);
-
-    // Configure the Output for stage2
-    OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
-        .setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf));
-    OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
-    stage2Vertex.addDataSink("MROutput", new DataSinkDescriptor(od, ocd, null));
-
-    UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer
-        .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();
-
-    DAG dag = new DAG("FilterLinesByWord");
-    Edge edge = new Edge(stage1Vertex, stage2Vertex, edgeConf.createDefaultBroadcastEdgeProperty());
-    dag.addVertex(stage1Vertex).addVertex(stage2Vertex).addEdge(edge);
-
-    LOG.info("Submitting DAG to Tez Session");
-    DAGClient dagClient = tezSession.submitDAG(dag);
-    LOG.info("Submitted DAG to Tez Session");
-
-    DAGStatus dagStatus = null;
-    String[] vNames = { "stage1", "stage2" };
-    try {
-      while (true) {
-        dagStatus = dagClient.getDAGStatus(null);
-        if(dagStatus.getState() == DAGStatus.State.RUNNING ||
-            dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
-            dagStatus.getState() == DAGStatus.State.FAILED ||
-            dagStatus.getState() == DAGStatus.State.KILLED ||
-            dagStatus.getState() == DAGStatus.State.ERROR) {
-          break;
-        }
-        try {
-          Thread.sleep(500);
-        } catch (InterruptedException e) {
-          // continue;
-        }
-      }
-
-      while (dagStatus.getState() == DAGStatus.State.RUNNING) {
-        try {
-          ExampleDriver.printDAGStatus(dagClient, vNames);
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException e) {
-            // continue;
-          }
-          dagStatus = dagClient.getDAGStatus(null);
-        } catch (TezException e) {
-          LOG.fatal("Failed to get application progress. Exiting");
-          return -1;
-        }
-      }
-      
-      dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
-      
-    } finally {
-      fs.delete(stagingDir, true);
-      tezSession.stop();
-    }
-
-    ExampleDriver.printDAGStatus(dagClient, vNames, true, true);
-    LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
-    return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
-  }
-  
-  public static void main(String[] args) throws Exception {
-    FilterLinesByWord fl = new FilterLinesByWord(true);
-    int status = ToolRunner.run(new Configuration(), fl, args);
-    if (fl.exitOnCompletion) {
-      System.exit(status);
-    }
-  }
-
-  public static class TextLongPair implements Writable {
-
-    private Text text;
-    private LongWritable longWritable;
-
-    public TextLongPair() {
-    }
-
-    public TextLongPair(Text text, LongWritable longWritable) {
-      this.text = text;
-      this.longWritable = longWritable;
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      this.text.write(out);
-      this.longWritable.write(out);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      this.text = new Text();
-      this.longWritable = new LongWritable();
-      text.readFields(in);
-      longWritable.readFields(in);
-    }
-
-    @Override
-    public String toString() {
-      return text.toString() + "\t" + longWritable.get();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
deleted file mode 100644
index b413993..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.ClassUtil;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DataSinkDescriptor;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.OutputCommitterDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.committer.MROutputCommitter;
-import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
-import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
-import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
-import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.processor.FilterByWordInputProcessor;
-import org.apache.tez.processor.FilterByWordOutputProcessor;
-import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer;
-
-public class FilterLinesByWordOneToOne extends Configured implements Tool {
-
-  private static Log LOG = LogFactory.getLog(FilterLinesByWordOneToOne.class);
-
-  public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
-
-  private static void printUsage() {
-    System.err.println("Usage filterLinesByWordOneToOne <in> <out> <filter_word>" 
-        + " [-generateSplitsInClient true/<false>]");
-    ToolRunner.printGenericCommandUsage(System.err);
-  }
-
-  public static void main(String[] args) throws Exception {
-    Configuration conf = new Configuration();
-    String[] otherArgs = new GenericOptionsParser(conf, args)
-        .getRemainingArgs();
-    int status = ToolRunner.run(conf, new FilterLinesByWordOneToOne(),
-        otherArgs);
-    System.exit(status);
-  }
-
-  @Override
-  public int run(String[] otherArgs) throws Exception {
-    boolean generateSplitsInClient = false;
-    SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
-    try {
-      generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
-      otherArgs = splitCmdLineParser.getRemainingArgs();
-    } catch (ParseException e1) {
-      System.err.println("Invalid options");
-      printUsage();
-      return 2;
-    }
-
-    if (otherArgs.length != 3) {
-      printUsage();
-      return 2;
-    }
-
-    String inputPath = otherArgs[0];
-    String outputPath = otherArgs[1];
-    String filterWord = otherArgs[2];
-    
-    Configuration conf = getConf();
-    FileSystem fs = FileSystem.get(conf);
-    if (fs.exists(new Path(outputPath))) {
-      System.err.println("Output directory : " + outputPath + " already exists");
-      return 2;
-    }
-
-    TezConfiguration tezConf = new TezConfiguration(conf);
-
-    fs.getWorkingDirectory();
-    Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
-    TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
-
-    String jarPath = ClassUtil.findContainingJar(FilterLinesByWordOneToOne.class);
-    if (jarPath == null) {
-      throw new TezUncheckedException("Could not find any jar containing"
-          + FilterLinesByWordOneToOne.class.getName() + " in the classpath");
-    }
-
-    Path remoteJarPath = fs.makeQualified(new Path(stagingDir, "dag_job.jar"));
-    fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
-    FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath);
-
-    Map<String, LocalResource> commonLocalResources = new TreeMap<String, LocalResource>();
-    LocalResource dagJarLocalRsrc = LocalResource.newInstance(
-        ConverterUtils.getYarnUrlFromPath(remoteJarPath),
-        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
-        remoteJarStatus.getLen(), remoteJarStatus.getModificationTime());
-    commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
-
-
-
-    TezClient tezSession = new TezClient("FilterLinesByWordSession", tezConf,
-        commonLocalResources, null);
-    tezSession.start(); // Why do I need to start the TezSession.
-
-    Configuration stage1Conf = new JobConf(conf);
-    stage1Conf.set(FILTER_PARAM_NAME, filterWord);
-
-    Configuration stage2Conf = new JobConf(conf);
-
-    stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
-    stage2Conf.setBoolean("mapred.mapper.new-api", false);
-
-    UserPayload stage1Payload = TezUtils.createUserPayloadFromConf(stage1Conf);
-    // Setup stage1 Vertex
-    Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
-        FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
-        .setTaskLocalFiles(commonLocalResources);
-
-    DataSourceDescriptor dsd;
-    if (generateSplitsInClient) {
-      // TODO TEZ-1406. Dont' use MRInputLegacy
-      stage1Conf.set(FileInputFormat.INPUT_DIR, inputPath);
-      stage1Conf.setBoolean("mapred.mapper.new-api", false);
-      dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(stage1Conf, stagingDir, true);
-    } else {
-      dsd = MRInputLegacy.createConfigurer(stage1Conf, TextInputFormat.class, inputPath)
-          .groupSplits(false).create();
-    }
-    stage1Vertex.addDataSource("MRInput", dsd);
-
-    // Setup stage2 Vertex
-    Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
-        FilterByWordOutputProcessor.class.getName()).setUserPayload(TezUtils
-        .createUserPayloadFromConf(stage2Conf)), dsd.getNumberOfShards());
-    stage2Vertex.setTaskLocalFiles(commonLocalResources);
-
-    // Configure the Output for stage2
-    stage2Vertex.addDataSink(
-        "MROutput",
-        new DataSinkDescriptor(new OutputDescriptor(MROutput.class.getName())
-            .setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf)),
-            new OutputCommitterDescriptor(MROutputCommitter.class.getName()), null));
-
-    UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer
-        .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();
-
-    DAG dag = new DAG("FilterLinesByWord");
-    Edge edge = new Edge(stage1Vertex, stage2Vertex, edgeConf.createDefaultOneToOneEdgeProperty());
-    dag.addVertex(stage1Vertex).addVertex(stage2Vertex).addEdge(edge);
-
-    LOG.info("Submitting DAG to Tez Session");
-    DAGClient dagClient = tezSession.submitDAG(dag);
-    LOG.info("Submitted DAG to Tez Session");
-
-    DAGStatus dagStatus = null;
-    String[] vNames = { "stage1", "stage2" };
-    try {
-      while (true) {
-        dagStatus = dagClient.getDAGStatus(null);
-        if(dagStatus.getState() == DAGStatus.State.RUNNING ||
-            dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
-            dagStatus.getState() == DAGStatus.State.FAILED ||
-            dagStatus.getState() == DAGStatus.State.KILLED ||
-            dagStatus.getState() == DAGStatus.State.ERROR) {
-          break;
-        }
-        try {
-          Thread.sleep(500);
-        } catch (InterruptedException e) {
-          // continue;
-        }
-      }
-
-      while (dagStatus.getState() == DAGStatus.State.RUNNING) {
-        try {
-          ExampleDriver.printDAGStatus(dagClient, vNames);
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException e) {
-            // continue;
-          }
-          dagStatus = dagClient.getDAGStatus(null);
-        } catch (TezException e) {
-          LOG.fatal("Failed to get application progress. Exiting");
-          return -1;
-        }
-      }
-    } finally {
-      fs.delete(stagingDir, true);
-      tezSession.stop();
-    }
-
-    ExampleDriver.printDAGStatus(dagClient, vNames);
-    LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
-    return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
deleted file mode 100644
index 939bea0..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.IOException;
-import java.util.StringTokenizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.client.MRTezClient;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-
-/**
- * Simple example that does a GROUP BY ORDER BY in an MRR job
- * Consider a query such as
- * Select DeptName, COUNT(*) as cnt FROM EmployeeTable
- * GROUP BY DeptName ORDER BY cnt;
- *
- * i.e. List all departments with count of employees in each department
- * and ordered based on department's employee count.
- *
- *  Requires an Input file containing 2 strings per line in format of
- *  <EmployeeName> <DeptName>
- *
- *  For example, use the following:
- *
- *  #/bin/bash
- *
- *  i=1000000
- *  j=1000
- *
- *  id=0
- *  while [[ "$id" -ne "$i" ]]
- *  do
- *    id=`expr $id + 1`
- *    deptId=`expr $RANDOM % $j + 1`
- *    deptName=`echo "ibase=10;obase=16;$deptId" | bc`
- *    echo "$id O$deptName"
- *  done
- *
- */
-public class GroupByOrderByMRRTest extends Configured implements Tool {
-
-  private static final Log LOG = LogFactory.getLog(GroupByOrderByMRRTest.class);
-
-  /**
-   * Mapper takes in a single line as input containing
-   * employee name and department name and then
-   * emits department name with count of 1
-   */
-  public static class MyMapper
-      extends Mapper<Object, Text, Text, IntWritable> {
-
-    private final static IntWritable one = new IntWritable(1);
-    private final static Text word = new Text();
-
-    public void map(Object key, Text value, Context context
-        ) throws IOException, InterruptedException {
-      StringTokenizer itr = new StringTokenizer(value.toString());
-      String empName = "";
-      String deptName = "";
-      if (itr.hasMoreTokens()) {
-        empName = itr.nextToken();
-        if (itr.hasMoreTokens()) {
-          deptName = itr.nextToken();
-        }
-        if (!empName.isEmpty()
-            && !deptName.isEmpty()) {
-          word.set(deptName);
-          context.write(word, one);
-        }
-      }
-    }
-  }
-
-  /**
-   * Intermediate reducer aggregates the total count per department.
-   * It takes department name and count as input and emits the final
-   * count per department name.
-   */
-  public static class MyGroupByReducer
-      extends Reducer<Text, IntWritable, IntWritable, Text> {
-    private IntWritable result = new IntWritable();
-
-    public void reduce(Text key, Iterable<IntWritable> values,
-        Context context
-        ) throws IOException, InterruptedException {
-
-      int sum = 0;
-      for (IntWritable val : values) {
-        sum += val.get();
-      }
-      result.set(sum);
-      context.write(result, key);
-    }
-  }
-
-  /**
-   * Shuffle ensures ordering based on count of employees per department
-   * hence the final reducer is a no-op and just emits the department name
-   * with the employee count per department.
-   */
-  public static class MyOrderByNoOpReducer
-      extends Reducer<IntWritable, Text, Text, IntWritable> {
-
-    public void reduce(IntWritable key, Iterable<Text> values,
-        Context context
-        ) throws IOException, InterruptedException {
-      for (Text word : values) {
-        context.write(word, key);
-      }
-    }
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-
-    // Configure intermediate reduces
-    conf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1);
-
-    // Set reducer class for intermediate reduce
-    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
-        "mapreduce.job.reduce.class"), MyGroupByReducer.class, Reducer.class);
-    // Set reducer output key class
-    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
-        "mapreduce.map.output.key.class"), IntWritable.class, Object.class);
-    // Set reducer output value class
-    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
-        "mapreduce.map.output.value.class"), Text.class, Object.class);
-    conf.setInt(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
-        "mapreduce.job.reduces"), 2);
-
-    String[] otherArgs = new GenericOptionsParser(conf, args).
-        getRemainingArgs();
-    if (otherArgs.length != 2) {
-      System.err.println("Usage: groupbyorderbymrrtest <in> <out>");
-      ToolRunner.printGenericCommandUsage(System.err);
-      return 2;
-    }
-
-    @SuppressWarnings("deprecation")
-    Job job = new Job(conf, "groupbyorderbymrrtest");
-
-    job.setJarByClass(GroupByOrderByMRRTest.class);
-
-    // Configure map
-    job.setMapperClass(MyMapper.class);
-    job.setMapOutputKeyClass(Text.class);
-    job.setMapOutputValueClass(IntWritable.class);
-
-    // Configure reduce
-    job.setReducerClass(MyOrderByNoOpReducer.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(IntWritable.class);
-    job.setNumReduceTasks(1);
-
-    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-
-    job.submit();
-    JobID jobId = job.getJobID();
-    ApplicationId appId = TypeConverter.toYarn(jobId).getAppId();
-
-    DAGClient dagClient = MRTezClient.getDAGClient(appId, new TezConfiguration(conf), null);
-    DAGStatus dagStatus;
-    String[] vNames = { "initialmap" , "ireduce1" , "finalreduce" };
-    while (true) {
-      dagStatus = dagClient.getDAGStatus(null);
-      if(dagStatus.getState() == DAGStatus.State.RUNNING ||
-         dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
-         dagStatus.getState() == DAGStatus.State.FAILED ||
-         dagStatus.getState() == DAGStatus.State.KILLED ||
-         dagStatus.getState() == DAGStatus.State.ERROR) {
-        break;
-      }
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        // continue;
-      }
-    }
-
-    while (dagStatus.getState() == DAGStatus.State.RUNNING) {
-      try {
-        ExampleDriver.printDAGStatus(dagClient, vNames);
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          // continue;
-        }
-        dagStatus = dagClient.getDAGStatus(null);
-      } catch (TezException e) {
-        LOG.fatal("Failed to get application progress. Exiting");
-        return -1;
-      }
-    }
-
-    ExampleDriver.printDAGStatus(dagClient, vNames);
-    LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
-    return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
-  }
-
-  public static void main(String[] args) throws Exception {
-    Configuration configuration = new Configuration();
-    GroupByOrderByMRRTest groupByOrderByMRRTest = new GroupByOrderByMRRTest();
-    int status = ToolRunner.run(configuration, groupByOrderByMRRTest, args);
-    System.exit(status);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
deleted file mode 100644
index 711240c..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-
-import com.google.common.base.Preconditions;
-
-public class IntersectDataGen extends Configured implements Tool {
-
-  private static final Log LOG = LogFactory.getLog(IntersectDataGen.class);
-
-  private static final String STREAM_OUTPUT_NAME = "streamoutput";
-  private static final String HASH_OUTPUT_NAME = "hashoutput";
-  private static final String EXPECTED_OUTPUT_NAME = "expectedoutput";
-
-  public static void main(String[] args) throws Exception {
-    IntersectDataGen dataGen = new IntersectDataGen();
-    int status = ToolRunner.run(new Configuration(), dataGen, args);
-    System.exit(status);
-  }
-
-  private static void printUsage() {
-    System.err
-        .println("Usage: "
-            + "intersectdatagen <outPath1> <path1Size> <outPath2> <path2Size> <expectedResultPath> <numTasks>");
-    ;
-    ToolRunner.printGenericCommandUsage(System.err);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs);
-  }
-  
-  public int run(Configuration conf, String[] args, TezClient tezSession) throws Exception {
-    setConf(conf);
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs, tezSession);
-  }
-  
-  private int validateArgs(String[] otherArgs) {
-    if (otherArgs.length != 6) {
-      printUsage();
-      return 2;
-    }
-    return 0;
-  }
-
-  private int execute(String [] args) throws TezException, IOException, InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    TezClient tezSession = null;
-    try {
-      tezSession = createTezSession(tezConf);
-      return execute(args, tezConf, tezSession);
-    } finally {
-      if (tezSession != null) {
-        tezSession.stop();
-      }
-    }
-  }
-  
-  private int execute(String[] args, TezClient tezSession) throws IOException, TezException,
-      InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    return execute(args, tezConf, tezSession);
-  }
-  
-  private TezClient createTezSession(TezConfiguration tezConf) throws TezException, IOException {
-    TezClient tezSession = new TezClient("IntersectDataGenSession", tezConf);
-    tezSession.start();
-    return tezSession;
-  }
-  
-  private int execute(String[] args, TezConfiguration tezConf, TezClient tezSession)
-      throws IOException, TezException, InterruptedException {
-    LOG.info("Running IntersectDataGen");
-
-    UserGroupInformation.setConfiguration(tezConf);
-
-    String outDir1 = args[0];
-    long outDir1Size = Long.parseLong(args[1]);
-    String outDir2 = args[2];
-    long outDir2Size = Long.parseLong(args[3]);
-    String expectedOutputDir = args[4];
-    int numTasks = Integer.parseInt(args[5]);
-
-    Path largeOutPath = null;
-    Path smallOutPath = null;
-    long largeOutSize = 0;
-    long smallOutSize = 0;
-
-    if (outDir1Size >= outDir2Size) {
-      largeOutPath = new Path(outDir1);
-      largeOutSize = outDir1Size;
-      smallOutPath = new Path(outDir2);
-      smallOutSize = outDir2Size;
-    } else {
-      largeOutPath = new Path(outDir2);
-      largeOutSize = outDir2Size;
-      smallOutPath = new Path(outDir1);
-      smallOutSize = outDir1Size;
-    }
-
-    Path expectedOutputPath = new Path(expectedOutputDir);
-
-    // Verify output path existence
-    FileSystem fs = FileSystem.get(tezConf);
-    int res = 0;
-    res = checkOutputDirectory(fs, largeOutPath) + checkOutputDirectory(fs, smallOutPath)
-        + checkOutputDirectory(fs, expectedOutputPath);
-    if (res != 0) {
-      return 3;
-    }
-
-    if (numTasks <= 0) {
-      System.err.println("NumTasks must be > 0");
-      return 4;
-    }
-
-    DAG dag = createDag(tezConf, largeOutPath, smallOutPath, expectedOutputPath, numTasks,
-        largeOutSize, smallOutSize);
-
-    tezSession.waitTillReady();
-    DAGClient dagClient = tezSession.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
-    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
-      return -1;
-    }
-    return 0;
-
-  }
-
-  private DAG createDag(TezConfiguration tezConf, Path largeOutPath, Path smallOutPath,
-      Path expectedOutputPath, int numTasks, long largeOutSize, long smallOutSize)
-      throws IOException {
-
-    long largeOutSizePerTask = largeOutSize / numTasks;
-    long smallOutSizePerTask = smallOutSize / numTasks;
-
-    DAG dag = new DAG("IntersectDataGen");
-
-    Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
-        GenDataProcessor.class.getName()).setUserPayload(
-        new UserPayload(GenDataProcessor.createConfiguration(largeOutSizePerTask, smallOutSizePerTask))), numTasks);
-    genDataVertex.addDataSink(STREAM_OUTPUT_NAME, 
-        MROutput.createConfigurer(new Configuration(tezConf),
-            TextOutputFormat.class, largeOutPath.toUri().toString()).create());
-    genDataVertex.addDataSink(HASH_OUTPUT_NAME, 
-        MROutput.createConfigurer(new Configuration(tezConf),
-            TextOutputFormat.class, smallOutPath.toUri().toString()).create());
-    genDataVertex.addDataSink(EXPECTED_OUTPUT_NAME, 
-        MROutput.createConfigurer(new Configuration(tezConf),
-            TextOutputFormat.class, expectedOutputPath.toUri().toString()).create());
-
-    dag.addVertex(genDataVertex);
-
-    return dag;
-  }
-
-  public static class GenDataProcessor extends SimpleMRProcessor {
-
-    private static final Log LOG = LogFactory.getLog(GenDataProcessor.class);
-
-    long streamOutputFileSize;
-    long hashOutputFileSize;
-    float overlapApprox = 0.2f;
-
-    public GenDataProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    public static byte[] createConfiguration(long streamOutputFileSize, long hashOutputFileSize)
-        throws IOException {
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      DataOutputStream dos = new DataOutputStream(bos);
-      dos.writeLong(streamOutputFileSize);
-      dos.writeLong(hashOutputFileSize);
-      dos.close();
-      bos.close();
-      return bos.toByteArray();
-    }
-
-    @Override
-    public void initialize() throws Exception {
-      byte[] payload = getContext().getUserPayload().getPayload();
-      ByteArrayInputStream bis = new ByteArrayInputStream(payload);
-      DataInputStream dis = new DataInputStream(bis);
-      streamOutputFileSize = dis.readLong();
-      hashOutputFileSize = dis.readLong();
-      LOG.info("Initialized with largeFileTargetSize=" + streamOutputFileSize
-          + ", smallFileTragetSize=" + hashOutputFileSize);
-      dis.close();
-      bis.close();
-    }
-
-    @Override
-    public void run() throws Exception {
-      Preconditions.checkState(getInputs().size() == 0);
-      Preconditions.checkState(getOutputs().size() == 3);
-
-      KeyValueWriter streamOutputWriter = (KeyValueWriter) getOutputs().get(STREAM_OUTPUT_NAME)
-          .getWriter();
-      KeyValueWriter hashOutputWriter = (KeyValueWriter) getOutputs().get(HASH_OUTPUT_NAME)
-          .getWriter();
-      KeyValueWriter expectedOutputWriter = (KeyValueWriter) getOutputs().get(EXPECTED_OUTPUT_NAME)
-          .getWriter();
-
-      float fileSizeFraction = hashOutputFileSize / (float) streamOutputFileSize;
-      Preconditions.checkState(fileSizeFraction > 0.0f && fileSizeFraction <= 1.0f);
-      int mod = 1;
-      int extraKeysMod = 0;
-      if (fileSizeFraction > overlapApprox) {
-        // Common keys capped by overlap. Additional ones required in the hashFile.
-        mod = (int) (1 / overlapApprox);
-        extraKeysMod = (int) (1 / (fileSizeFraction - overlapApprox));
-      } else {
-        // All keys in hashFile must exist in stream file.
-        mod = (int) (1 / fileSizeFraction);
-      }
-      LOG.info("Using mod=" + mod + ", extraKeysMod=" + extraKeysMod);
-
-      long count = 0;
-      long sizeLarge = 0;
-      long sizeSmall = 0;
-      long numLargeFileKeys = 0;
-      long numSmallFileKeys = 0;
-      long numExpectedKeys = 0;
-      while (sizeLarge < streamOutputFileSize) {
-        String str = createOverlapString(13, count);
-        Text text = new Text(str);
-        int size = text.getLength();
-        streamOutputWriter.write(text, NullWritable.get());
-        sizeLarge += size;
-        numLargeFileKeys++;
-        if (count % mod == 0) {
-          hashOutputWriter.write(text, NullWritable.get());
-          sizeSmall += size;
-          numSmallFileKeys++;
-          expectedOutputWriter.write(text, NullWritable.get());
-          numExpectedKeys++;
-        }
-        if (extraKeysMod != 0 && count % extraKeysMod == 0) {
-          String nStr = createNonOverlaptring(13, count);
-          Text nText = new Text(nStr);
-          hashOutputWriter.write(nText, NullWritable.get());
-          sizeSmall += nText.getLength();
-          numSmallFileKeys++;
-        }
-        count++;
-      }
-      LOG.info("OutputStats: " + "largeFileNumKeys=" + numLargeFileKeys + ", smallFileNumKeys="
-          + numSmallFileKeys + ", expFileNumKeys=" + numExpectedKeys + ", largeFileSize="
-          + sizeLarge + ", smallFileSize=" + sizeSmall);
-    }
-
-    private String createOverlapString(int size, long count) {
-      StringBuilder sb = new StringBuilder();
-      Random random = new Random();
-      for (int i = 0; i < size; i++) {
-        int r = Math.abs(random.nextInt()) % 26;
-        // Random a-z followed by the count
-        sb.append((char) (97 + r));
-      }
-      sb.append("_").append(getContext().getTaskIndex()).append("_").append(count);
-      return sb.toString();
-    }
-
-    private String createNonOverlaptring(int size, long count) {
-      StringBuilder sb = new StringBuilder();
-      Random random = new Random();
-      for (int i = 0; i < size; i++) {
-        int r = Math.abs(random.nextInt()) % 26;
-        // Random A-Z followed by the count
-        sb.append((char) (65 + r));
-      }
-      sb.append("_").append(getContext().getTaskIndex()).append("_").append(count);
-      return sb.toString();
-    }
-
-  }
-
-  private int checkOutputDirectory(FileSystem fs, Path path) throws IOException {
-    if (fs.exists(path)) {
-      System.err.println("Output directory: " + path + " already exists");
-      return 2;
-    }
-    return 0;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
deleted file mode 100644
index da18014..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.input.MRInput;
-import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-import org.apache.tez.runtime.library.processor.SimpleProcessor;
-
-import com.google.common.base.Preconditions;
-
-public class IntersectExample extends Configured implements Tool {
-
-  private static final Log LOG = LogFactory.getLog(IntersectExample.class);
-
-  public static void main(String[] args) throws Exception {
-    IntersectExample intersect = new IntersectExample();
-    int status = ToolRunner.run(new Configuration(), intersect, args);
-    System.exit(status);
-  }
-
-  private static void printUsage() {
-    System.err.println("Usage: " + "intersect <file1> <file2> <numPartitions> <outPath>");
-    ToolRunner.printGenericCommandUsage(System.err);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs);
-  }
-  
-  public int run(Configuration conf, String[] args, TezClient tezSession) throws Exception {
-    setConf(conf);
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs, tezSession);
-  }
-  
-  private int validateArgs(String[] otherArgs) {
-    if (otherArgs.length != 4) {
-      printUsage();
-      return 2;
-    }
-    return 0;
-  }
-
-  private int execute(String[] args) throws TezException, IOException, InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    TezClient tezSession = null;
-    try {
-      tezSession = createTezSession(tezConf);
-      return execute(args, tezConf, tezSession);
-    } finally {
-      if (tezSession != null) {
-        tezSession.stop();
-      }
-    }
-  }
-  
-  private int execute(String[] args, TezClient tezSession) throws IOException, TezException,
-      InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    return execute(args, tezConf, tezSession);
-  }
-  
-  private TezClient createTezSession(TezConfiguration tezConf) throws TezException, IOException {
-    TezClient tezSession = new TezClient("IntersectExampleSession", tezConf);
-    tezSession.start();
-    return tezSession;
-  }
-  
-  private int execute(String[] args, TezConfiguration tezConf, TezClient tezSession)
-      throws IOException, TezException, InterruptedException {
-    LOG.info("Running IntersectExample");
-
-    UserGroupInformation.setConfiguration(tezConf);
-
-    String streamInputDir = args[0];
-    String hashInputDir = args[1];
-    int numPartitions = Integer.parseInt(args[2]);
-    String outputDir = args[3];
-
-    Path streamInputPath = new Path(streamInputDir);
-    Path hashInputPath = new Path(hashInputDir);
-    Path outputPath = new Path(outputDir);
-
-    // Verify output path existence
-    FileSystem fs = FileSystem.get(tezConf);
-    if (fs.exists(outputPath)) {
-      System.err.println("Output directory: " + outputDir + " already exists");
-      return 3;
-    }
-    if (numPartitions <= 0) {
-      System.err.println("NumPartitions must be > 0");
-      return 4;
-    }
-
-    DAG dag = createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions);
-
-    tezSession.waitTillReady();
-    DAGClient dagClient = tezSession.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
-    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
-      return -1;
-    }
-    return 0;
-
-  }
-
-  private DAG createDag(TezConfiguration tezConf, Path streamPath, Path hashPath, Path outPath,
-      int numPartitions) throws IOException {
-    DAG dag = new DAG("IntersectExample");
-
-    // Configuration for intermediate output - shared by Vertex1 and Vertex2
-    // This should only be setting selective keys from the underlying conf. Fix after there's a
-    // better mechanism to configure the IOs.
-
-    UnorderedPartitionedKVEdgeConfigurer edgeConf =
-        UnorderedPartitionedKVEdgeConfigurer
-            .newBuilder(Text.class.getName(), NullWritable.class.getName(),
-                HashPartitioner.class.getName()).build();
-
-    // Change the way resources are setup - no MRHelpers
-    Vertex streamFileVertex = new Vertex("partitioner1", new ProcessorDescriptor(
-        ForwardingProcessor.class.getName())).addDataSource(
-        "streamfile",
-        MRInput
-            .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
-                streamPath.toUri().toString()).groupSplits(false).create());
-
-    Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor(
-        ForwardingProcessor.class.getName())).addDataSource(
-        "hashfile",
-        MRInput
-            .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
-                hashPath.toUri().toString()).groupSplits(false).create());
-
-    Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor(
-        IntersectProcessor.class.getName()), numPartitions).addDataSink("finalOutput",
-        MROutput.createConfigurer(new Configuration(tezConf),
-            TextOutputFormat.class, outPath.toUri().toString()).create());
-
-    Edge e1 = new Edge(streamFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
-
-    Edge e2 = new Edge(hashFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
-
-    dag.addVertex(streamFileVertex).addVertex(hashFileVertex).addVertex(intersectVertex)
-        .addEdge(e1).addEdge(e2);
-    return dag;
-  }
-
-  // private void obtainTokens(Credentials credentials, Path... paths) throws IOException {
-  // TokenCache.obtainTokensForNamenodes(credentials, paths, getConf());
-  // }
-
-  /**
-   * Reads key-values from the source and forwards the value as the key for the output
-   */
-  public static class ForwardingProcessor extends SimpleProcessor {
-    public ForwardingProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void run() throws Exception {
-      Preconditions.checkState(getInputs().size() == 1);
-      Preconditions.checkState(getOutputs().size() == 1);
-      LogicalInput input = getInputs().values().iterator().next();
-      Reader rawReader = input.getReader();
-      Preconditions.checkState(rawReader instanceof KeyValueReader);
-      LogicalOutput output = getOutputs().values().iterator().next();
-
-      KeyValueReader reader = (KeyValueReader) rawReader;
-      KeyValueWriter writer = (KeyValueWriter) output.getWriter();
-
-      while (reader.next()) {
-        Object val = reader.getCurrentValue();
-        writer.write(val, NullWritable.get());
-      }
-    }
-  }
-
-  public static class IntersectProcessor extends SimpleMRProcessor {
-
-    public IntersectProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void run() throws Exception {
-      Preconditions.checkState(getInputs().size() == 2);
-      Preconditions.checkState(getOutputs().size() == 1);
-      LogicalInput streamInput = getInputs().get("partitioner1");
-      LogicalInput hashInput = getInputs().get("partitioner2");
-      Reader rawStreamReader = streamInput.getReader();
-      Reader rawHashReader = hashInput.getReader();
-      Preconditions.checkState(rawStreamReader instanceof KeyValueReader);
-      Preconditions.checkState(rawHashReader instanceof KeyValueReader);
-      LogicalOutput lo = getOutputs().values().iterator().next();
-      Preconditions.checkState(lo instanceof MROutput);
-      MROutput output = (MROutput) lo;
-      KeyValueWriter writer = output.getWriter();
-
-      KeyValueReader hashKvReader = (KeyValueReader) rawHashReader;
-      Set<Text> keySet = new HashSet<Text>();
-      while (hashKvReader.next()) {
-        keySet.add(new Text((Text) hashKvReader.getCurrentKey()));
-      }
-
-      KeyValueReader streamKvReader = (KeyValueReader) rawStreamReader;
-      while (streamKvReader.next()) {
-        Text key = (Text) streamKvReader.getCurrentKey();
-        if (keySet.contains(key)) {
-          writer.write(key, NullWritable.get());
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
deleted file mode 100644
index c5328c7..0000000
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.examples;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.examples.IntersectExample.ForwardingProcessor;
-import org.apache.tez.mapreduce.input.MRInput;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValuesReader;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-import org.apache.tez.runtime.library.processor.SimpleProcessor;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-
-public class IntersectValidate extends Configured implements Tool {
-  private static final Log LOG = LogFactory.getLog(IntersectExample.class);
-
-  private static final String LHS_INPUT_NAME = "lhsfile";
-  private static final String RHS_INPUT_NAME = "rhsfile";
-
-  private static final String COUNTER_GROUP_NAME = "INTERSECT_VALIDATE";
-  private static final String MISSING_KEY_COUNTER_NAME = "MISSING_KEY_EXISTS";
-
-  public static void main(String[] args) throws Exception {
-    IntersectValidate validate = new IntersectValidate();
-    int status = ToolRunner.run(new Configuration(), validate, args);
-    System.exit(status);
-  }
-
-  private static void printUsage() {
-    System.err.println("Usage: " + "intersectvalidate <path1> <path2>");
-    ToolRunner.printGenericCommandUsage(System.err);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    Configuration conf = getConf();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs);
-  }
-  
-  public int run(Configuration conf, String[] args, TezClient tezSession) throws Exception {
-    setConf(conf);
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    int result = validateArgs(otherArgs);
-    if (result != 0) {
-      return result;
-    }
-    return execute(otherArgs, tezSession);
-  } 
-
-  private int validateArgs(String[] otherArgs) {
-    if (otherArgs.length != 3 && otherArgs.length != 2) {
-      printUsage();
-      return 2;
-    }
-    return 0;
-  }
-
-  private int execute(String[] args) throws TezException, IOException, InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    TezClient tezSession = null;
-    try {
-      tezSession = createTezSession(tezConf);
-      return execute(args, tezConf, tezSession);
-    } finally {
-      if (tezSession != null) {
-        tezSession.stop();
-      }
-    }
-  }
-  
-  private int execute(String[] args, TezClient tezSession) throws IOException, TezException,
-      InterruptedException {
-    TezConfiguration tezConf = new TezConfiguration(getConf());
-    return execute(args, tezConf, tezSession);
-  }
-  
-  private TezClient createTezSession(TezConfiguration tezConf) throws TezException, IOException {
-    TezClient tezSession = new TezClient("IntersectValidateSession", tezConf);
-    tezSession.start();
-    return tezSession;
-  }
-
-  private int execute(String[] args, TezConfiguration tezConf, TezClient tezSession)
-      throws IOException, TezException, InterruptedException {
-    LOG.info("Running IntersectValidate");
-    UserGroupInformation.setConfiguration(tezConf);
-
-    String lhsDir = args[0];
-    String rhsDir = args[1];
-    int numPartitions = 1;
-    if (args.length == 3) {
-      numPartitions = Integer.parseInt(args[2]);
-    }
-
-    if (numPartitions <= 0) {
-      System.err.println("NumPartitions must be > 0");
-      return 4;
-    }
-
-    Path lhsPath = new Path(lhsDir);
-    Path rhsPath = new Path(rhsDir);
-
-    DAG dag = createDag(tezConf, lhsPath, rhsPath, numPartitions);
-
-    tezSession.waitTillReady();
-    DAGClient dagClient = tezSession.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
-    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-      LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
-      return -1;
-    } else {
-      dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
-      TezCounter counter = dagStatus.getDAGCounters().findCounter(COUNTER_GROUP_NAME,
-          MISSING_KEY_COUNTER_NAME);
-      if (counter == null) {
-        LOG.info("Unable to determing equality");
-        return -2;
-      } else {
-        if (counter.getValue() != 0) {
-          LOG.info("Validate failed. The two sides are not equivalent");
-          return -3;
-        } else {
-          LOG.info("Validation successful. The two sides are equivalent");
-          return 0;
-        }
-      }
-    }
-  }
-
-  private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
-      throws IOException {
-    DAG dag = new DAG("IntersectValidate");
-
-    // Configuration for intermediate output - shared by Vertex1 and Vertex2
-    // This should only be setting selective keys from the underlying conf. Fix after there's a
-    // better mechanism to configure the IOs.
-    OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
-        .newBuilder(Text.class.getName(), NullWritable.class.getName(),
-            HashPartitioner.class.getName()).build();
-
-    Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor(
-        ForwardingProcessor.class.getName())).addDataSource("lhs",
-        MRInput
-            .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
-                lhs.toUri().toString()).groupSplits(false).create());
-
-    Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor(
-        ForwardingProcessor.class.getName())).addDataSource("rhs",
-        MRInput
-            .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
-                rhs.toUri().toString()).groupSplits(false).create());
-
-    Vertex intersectValidateVertex = new Vertex("intersectvalidate", new ProcessorDescriptor(
-        IntersectValidateProcessor.class.getName()), numPartitions);
-
-    Edge e1 = new Edge(lhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
-    Edge e2 = new Edge(rhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
-
-    dag.addVertex(lhsVertex).addVertex(rhsVertex).addVertex(intersectValidateVertex).addEdge(e1)
-        .addEdge(e2);
-    return dag;
-  }
-
-  public static class IntersectValidateProcessor extends SimpleProcessor {
-
-    private static final Log LOG = LogFactory.getLog(IntersectValidateProcessor.class);
-
-    public IntersectValidateProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void run() throws Exception {
-      Preconditions.checkState(getInputs().size() == 2);
-      Preconditions.checkState(getOutputs().size() == 0);
-      LogicalInput lhsInput = getInputs().get(LHS_INPUT_NAME);
-      LogicalInput rhsInput = getInputs().get(RHS_INPUT_NAME);
-      Reader lhsReaderRaw = lhsInput.getReader();
-      Reader rhsReaderRaw = rhsInput.getReader();
-      Preconditions.checkState(lhsReaderRaw instanceof KeyValuesReader);
-      Preconditions.checkState(rhsReaderRaw instanceof KeyValuesReader);
-      KeyValuesReader lhsReader = (KeyValuesReader) lhsReaderRaw;
-      KeyValuesReader rhsReader = (KeyValuesReader) rhsReaderRaw;
-
-      TezCounter lhsMissingKeyCounter = getContext().getCounters().findCounter(COUNTER_GROUP_NAME,
-          MISSING_KEY_COUNTER_NAME);
-
-      while (lhsReader.next()) {
-        if (rhsReader.next()) {
-          if (!lhsReader.getCurrentKey().equals(rhsReader.getCurrentKey())) {
-            LOG.info("MismatchedKeys: " + "lhs=" + lhsReader.getCurrentKey() + ", rhs=" + rhsReader.getCurrentKey());
-            lhsMissingKeyCounter.increment(1);
-          }
-        } else {
-          lhsMissingKeyCounter.increment(1);
-          LOG.info("ExtraKey in lhs: " + lhsReader.getClass());
-          break;
-        }
-      }
-      if (rhsReader.next()) {
-        lhsMissingKeyCounter.increment(1);
-        LOG.info("ExtraKey in rhs: " + lhsReader.getClass());
-      }
-    }
-  }
-
-}


Mime
View raw message