tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [02/10] TEZ-1055. Rename tez-mapreduce-examples to tez-examples (Hitesh Shah via bikas)
Date Sat, 16 Aug 2014 00:54:50 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
new file mode 100644
index 0000000..72c14fc
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -0,0 +1,479 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.TreeMap;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.PreWarmVertex;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * An MRR job built on top of word count to return words sorted by
+ * their frequency of occurrence.
+ *
+ * Use -DUSE_TEZ_SESSION=true to run jobs in a session mode.
+ * If multiple input/outputs are provided, this job will process each pair
+ * as a separate DAG in a sequential manner.
+ * Use -DINTER_JOB_SLEEP_INTERVAL=<N> where N is the sleep interval in seconds
+ * between the sequential DAGs.
+ */
+public class TestOrderedWordCount extends Configured implements Tool {
+
+  private static Log LOG = LogFactory.getLog(TestOrderedWordCount.class);
+
+  public static class TokenizerMapper
+       extends Mapper<Object, Text, Text, IntWritable>{
+
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+
+    public void map(Object key, Text value, Context context
+                    ) throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+
+  public static class IntSumReducer
+       extends Reducer<Text,IntWritable,IntWritable, Text> {
+    private IntWritable result = new IntWritable();
+
+    public void reduce(Text key, Iterable<IntWritable> values,
+                       Context context
+                       ) throws IOException, InterruptedException {
+      int sum = 0;
+      for (IntWritable val : values) {
+        sum += val.get();
+      }
+      result.set(sum);
+      context.write(result, key);
+    }
+  }
+
+  /**
+   * Shuffle ensures ordering based on count of employees per department
+   * hence the final reducer is a no-op and just emits the department name
+   * with the employee count per department.
+   */
+  public static class MyOrderByNoOpReducer
+      extends Reducer<IntWritable, Text, Text, IntWritable> {
+
+    public void reduce(IntWritable key, Iterable<Text> values,
+        Context context
+        ) throws IOException, InterruptedException {
+      for (Text word : values) {
+        context.write(word, key);
+      }
+    }
+  }
+
+  private Credentials credentials = new Credentials();
+
+  @VisibleForTesting
+  public DAG createDAG(FileSystem fs, Configuration conf,
+      Map<String, LocalResource> commonLocalResources, Path stagingDir,
+      int dagIndex, String inputPath, String outputPath,
+      boolean generateSplitsInClient) throws Exception {
+
+    Configuration mapStageConf = new JobConf(conf);
+    mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
+        TokenizerMapper.class.getName());
+
+    MRHelpers.translateMRConfToTez(mapStageConf);
+
+    Configuration iReduceStageConf = new JobConf(conf);
+    // TODO replace with auto-reduce parallelism
+    iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2);
+    iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
+        IntSumReducer.class.getName());
+    iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
+        IntWritable.class.getName());
+    iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
+    MRHelpers.translateMRConfToTez(iReduceStageConf);
+
+    Configuration finalReduceConf = new JobConf(conf);
+    finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
+        MyOrderByNoOpReducer.class.getName());
+    finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+    finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+    MRHelpers.translateMRConfToTez(finalReduceConf);
+
+    MRHelpers.configureMRApiUsage(mapStageConf);
+    MRHelpers.configureMRApiUsage(iReduceStageConf);
+    MRHelpers.configureMRApiUsage(finalReduceConf);
+
+    List<Vertex> vertices = new ArrayList<Vertex>();
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
+    mapStageConf.writeXml(outputStream);
+    String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
+    DataSourceDescriptor dsd;
+    if (generateSplitsInClient) {
+      mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+          TextInputFormat.class.getName());
+      mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
+      mapStageConf.setBoolean("mapred.mapper.new-api", true);
+      dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(mapStageConf, stagingDir, true);
+    } else {
+      dsd = MRInputLegacy.createConfigurer(mapStageConf, TextInputFormat.class, inputPath).create();
+    }
+
+    Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
+        MapProcessor.class.getName()).setUserPayload(
+        TezUtils.createUserPayloadFromConf(mapStageConf))
+        .setHistoryText(mapStageHistoryText)).setTaskLocalFiles(commonLocalResources);
+    mapVertex.addDataSource("MRInput", dsd);
+    vertices.add(mapVertex);
+
+    ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
+    iReduceStageConf.writeXml(iROutputStream);
+    String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
+    Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
+        ReduceProcessor.class.getName())
+            .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
+            .setHistoryText(iReduceStageHistoryText), 2);
+    ivertex.setTaskLocalFiles(commonLocalResources);
+    vertices.add(ivertex);
+
+    ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
+    finalReduceConf.writeXml(finalReduceOutputStream);
+    String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
+    UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
+    Vertex finalReduceVertex = new Vertex("finalreduce",
+        new ProcessorDescriptor(
+            ReduceProcessor.class.getName())
+                .setUserPayload(finalReducePayload)
+                .setHistoryText(finalReduceStageHistoryText), 1);
+    finalReduceVertex.setTaskLocalFiles(commonLocalResources);
+    finalReduceVertex.addDataSink("MROutput",
+        MROutputLegacy.createConfigurer(finalReduceConf, TextOutputFormat.class, outputPath)
+            .create());
+    vertices.add(finalReduceVertex);
+
+    DAG dag = new DAG("OrderedWordCount" + dagIndex);
+    for (int i = 0; i < vertices.size(); ++i) {
+      dag.addVertex(vertices.get(i));
+    }
+
+    OrderedPartitionedKVEdgeConfigurer edgeConf1 = OrderedPartitionedKVEdgeConfigurer
+        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
+            HashPartitioner.class.getName()).setFromConfiguration(conf)
+	    .configureInput().useLegacyInput().done().build();
+    dag.addEdge(
+        new Edge(dag.getVertex("initialmap"), dag.getVertex("intermediate_reducer"),
+            edgeConf1.createDefaultEdgeProperty()));
+
+    OrderedPartitionedKVEdgeConfigurer edgeConf2 = OrderedPartitionedKVEdgeConfigurer
+        .newBuilder(IntWritable.class.getName(), Text.class.getName(),
+            HashPartitioner.class.getName()).setFromConfiguration(conf)
+            .configureInput().useLegacyInput().done().build();
+    dag.addEdge(
+        new Edge(dag.getVertex("intermediate_reducer"), dag.getVertex("finalreduce"),
+            edgeConf2.createDefaultEdgeProperty()));
+
+    return dag;
+  }
+
+  private static void printUsage() {
+    String options = " [-generateSplitsInClient true/<false>]";
+    System.err.println("Usage: testorderedwordcount <in> <out>" + options);
+    System.err.println("Usage (In Session Mode):"
+        + " testorderedwordcount <in1> <out1> ... <inN> <outN>" + options);
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+
+    boolean generateSplitsInClient;
+
+    SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
+    try {
+      generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
+      otherArgs = splitCmdLineParser.getRemainingArgs();
+    } catch (ParseException e1) {
+      System.err.println("Invalid options");
+      printUsage();
+      return 2;
+    }
+
+    boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
+    long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0)
+        * 1000;
+
+    boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false);
+
+    if (((otherArgs.length%2) != 0)
+        || (!useTezSession && otherArgs.length != 2)) {
+      printUsage();
+      return 2;
+    }
+
+    List<String> inputPaths = new ArrayList<String>();
+    List<String> outputPaths = new ArrayList<String>();
+
+    for (int i = 0; i < otherArgs.length; i+=2) {
+      inputPaths.add(otherArgs[i]);
+      outputPaths.add(otherArgs[i+1]);
+    }
+
+    UserGroupInformation.setConfiguration(conf);
+
+    TezConfiguration tezConf = new TezConfiguration(conf);
+    TestOrderedWordCount instance = new TestOrderedWordCount();
+
+    FileSystem fs = FileSystem.get(conf);
+
+    String stagingDirStr =  conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
+            TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + Path.SEPARATOR + 
+            Long.toString(System.currentTimeMillis());
+    Path stagingDir = new Path(stagingDirStr);
+    FileSystem pathFs = stagingDir.getFileSystem(tezConf);
+    pathFs.mkdirs(new Path(stagingDirStr));
+
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
+    stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
+    
+    TokenCache.obtainTokensForNamenodes(instance.credentials, new Path[] {stagingDir}, conf);
+    TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+
+    // No need to add jar containing this class as assumed to be part of
+    // the tez jars.
+
+    // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
+    // is the same filesystem as the one used for Input/Output.
+    
+    if (useTezSession) {
+      LOG.info("Creating Tez Session");
+      tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
+    } else {
+      tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
+    }
+    TezClient tezSession = new TezClient("OrderedWordCountSession", tezConf,
+        null, instance.credentials);
+    tezSession.start();
+
+    DAGStatus dagStatus = null;
+    DAGClient dagClient = null;
+    String[] vNames = { "initialmap", "intermediate_reducer",
+      "finalreduce" };
+
+    Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+    try {
+      for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) {
+        if (dagIndex != 1
+            && interJobSleepTimeout > 0) {
+          try {
+            LOG.info("Sleeping between jobs, sleepInterval="
+                + (interJobSleepTimeout/1000));
+            Thread.sleep(interJobSleepTimeout);
+          } catch (InterruptedException e) {
+            LOG.info("Main thread interrupted. Breaking out of job loop");
+            break;
+          }
+        }
+
+        String inputPath = inputPaths.get(dagIndex-1);
+        String outputPath = outputPaths.get(dagIndex-1);
+
+        if (fs.exists(new Path(outputPath))) {
+          throw new FileAlreadyExistsException("Output directory "
+              + outputPath + " already exists");
+        }
+        LOG.info("Running OrderedWordCount DAG"
+            + ", dagIndex=" + dagIndex
+            + ", inputPath=" + inputPath
+            + ", outputPath=" + outputPath);
+
+        Map<String, LocalResource> localResources =
+          new TreeMap<String, LocalResource>();
+        
+        DAG dag = instance.createDAG(fs, conf, localResources,
+            stagingDir, dagIndex, inputPath, outputPath,
+            generateSplitsInClient);
+
+        boolean doPreWarm = dagIndex == 1 && useTezSession
+            && conf.getBoolean("PRE_WARM_SESSION", true);
+        int preWarmNumContainers = 0;
+        if (doPreWarm) {
+          preWarmNumContainers = conf.getInt("PRE_WARM_NUM_CONTAINERS", 0);
+          if (preWarmNumContainers <= 0) {
+            doPreWarm = false;
+          }
+        }
+        if (doPreWarm) {
+          LOG.info("Pre-warming Session");
+          PreWarmVertex preWarmVertex = new PreWarmVertex("PreWarm", preWarmNumContainers, dag
+              .getVertex("initialmap").getTaskResource());
+          preWarmVertex.setTaskLocalFiles(dag.getVertex("initialmap").getTaskLocalFiles());
+          preWarmVertex.setTaskEnvironment(dag.getVertex("initialmap").getTaskEnvironment());
+          preWarmVertex.setTaskLaunchCmdOpts(dag.getVertex("initialmap").getTaskLaunchCmdOpts());
+          
+          tezSession.preWarm(preWarmVertex);
+        }
+
+        if (useTezSession) {
+          LOG.info("Waiting for TezSession to get into ready state");
+          waitForTezSessionReady(tezSession);
+          LOG.info("Submitting DAG to Tez Session, dagIndex=" + dagIndex);
+          dagClient = tezSession.submitDAG(dag);
+          LOG.info("Submitted DAG to Tez Session, dagIndex=" + dagIndex);
+        } else {
+          LOG.info("Submitting DAG as a new Tez Application");
+          dagClient = tezSession.submitDAG(dag);
+        }
+
+        while (true) {
+          dagStatus = dagClient.getDAGStatus(statusGetOpts);
+          if (dagStatus.getState() == DAGStatus.State.RUNNING ||
+              dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+              dagStatus.getState() == DAGStatus.State.FAILED ||
+              dagStatus.getState() == DAGStatus.State.KILLED ||
+              dagStatus.getState() == DAGStatus.State.ERROR) {
+            break;
+          }
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+            // continue;
+          }
+        }
+
+
+        while (dagStatus.getState() != DAGStatus.State.SUCCEEDED &&
+            dagStatus.getState() != DAGStatus.State.FAILED &&
+            dagStatus.getState() != DAGStatus.State.KILLED &&
+            dagStatus.getState() != DAGStatus.State.ERROR) {
+          if (dagStatus.getState() == DAGStatus.State.RUNNING) {
+            ExampleDriver.printDAGStatus(dagClient, vNames);
+          }
+          try {
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+              // continue;
+            }
+            dagStatus = dagClient.getDAGStatus(statusGetOpts);
+          } catch (TezException e) {
+            LOG.fatal("Failed to get application progress. Exiting");
+            return -1;
+          }
+        }
+        ExampleDriver.printDAGStatus(dagClient, vNames,
+            true, true);
+        LOG.info("DAG " + dagIndex + " completed. "
+            + "FinalState=" + dagStatus.getState());
+        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+          LOG.info("DAG " + dagIndex + " diagnostics: "
+            + dagStatus.getDiagnostics());
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Error occurred when submitting/running DAGs", e);
+      throw e;
+    } finally {
+      if (!retainStagingDir) {
+        pathFs.delete(stagingDir, true);
+      }
+      LOG.info("Shutting down session");
+      tezSession.stop();
+    }
+
+    if (!useTezSession) {
+      ExampleDriver.printDAGStatus(dagClient, vNames);
+      LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
+    }
+    return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
+  }
+
+  private static void waitForTezSessionReady(TezClient tezSession)
+    throws IOException, TezException, InterruptedException {
+    tezSession.waitTillReady();
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new TestOrderedWordCount(), args);
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
new file mode 100644
index 0000000..a9b23e0
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.VertexGroup;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInput.MRInputConfigurer;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.Output;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class UnionExample {
+
+  public static class TokenProcessor extends SimpleMRProcessor {
+    IntWritable one = new IntWritable(1);
+    Text word = new Text();
+
+    public TokenProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkArgument(getInputs().size() == 1);
+      boolean inUnion = true;
+      if (getContext().getTaskVertexName().equals("map3")) {
+        inUnion = false;
+      }
+      Preconditions.checkArgument(getOutputs().size() == (inUnion ? 2 : 1));
+      Preconditions.checkArgument(getOutputs().containsKey("checker"));
+      MRInput input = (MRInput) getInputs().values().iterator().next();
+      KeyValueReader kvReader = input.getReader();
+      Output output =  getOutputs().get("checker");
+      KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
+      MROutput parts = null;
+      KeyValueWriter partsWriter = null;
+      if (inUnion) {
+        parts = (MROutput) getOutputs().get("parts");
+        partsWriter = parts.getWriter();
+      }
+      while (kvReader.next()) {
+        StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
+        while (itr.hasMoreTokens()) {
+          word.set(itr.nextToken());
+          kvWriter.write(word, one);
+          if (inUnion) {
+            partsWriter.write(word, one);
+          }
+        }
+      }
+    }
+
+  }
+
+  public static class UnionProcessor extends SimpleMRProcessor {
+    IntWritable one = new IntWritable(1);
+
+    public UnionProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkArgument(getInputs().size() == 2);
+      Preconditions.checkArgument(getOutputs().size() == 2);
+      MROutput out = (MROutput) getOutputs().get("union");
+      MROutput allParts = (MROutput) getOutputs().get("all-parts");
+      KeyValueWriter kvWriter = out.getWriter();
+      KeyValueWriter partsWriter = allParts.getWriter();
+      Map<String, AtomicInteger> unionKv = Maps.newHashMap();
+      LogicalInput union = getInputs().get("union");
+      KeyValuesReader kvReader = (KeyValuesReader) union.getReader();
+      while (kvReader.next()) {
+        String word = ((Text) kvReader.getCurrentKey()).toString();
+        IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
+        for (int i = 0; i < intVal.get(); ++i) {
+          partsWriter.write(word, one);
+        }
+        AtomicInteger value = unionKv.get(word);
+        if (value == null) {
+          unionKv.put(word, new AtomicInteger(intVal.get()));
+        } else {
+          value.addAndGet(intVal.get());
+        }
+      }
+      LogicalInput map3 = getInputs().get("map3");
+      kvReader = (KeyValuesReader) map3.getReader();
+      while (kvReader.next()) {
+        String word = ((Text) kvReader.getCurrentKey()).toString();
+        IntWritable intVal = (IntWritable) kvReader.getCurrentValues().iterator().next();
+        AtomicInteger value = unionKv.get(word);
+        if (value == null) {
+          throw new TezUncheckedException("Expected to exist: " + word);
+        } else {
+          value.getAndAdd(intVal.get() * -2);
+        }
+      }
+      for (AtomicInteger value : unionKv.values()) {
+        if (value.get() != 0) {
+          throw new TezUncheckedException("Unexpected non-zero value");
+        }
+      }
+      kvWriter.write("Union", new IntWritable(unionKv.size()));
+    }
+
+  }
+
+  private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
+      Map<String, LocalResource> localResources, Path stagingDir,
+      String inputPath, String outputPath) throws IOException {
+    DAG dag = new DAG("UnionExample");
+    
+    int numMaps = -1;
+    Configuration inputConf = new Configuration(tezConf);
+    MRInputConfigurer configurer = MRInput.createConfigurer(inputConf, TextInputFormat.class,
+        inputPath);
+    DataSourceDescriptor dataSource = configurer.generateSplitsInAM(false).create();
+
+    Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
+        TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
+
+    Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
+        TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
+
+    Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
+        TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
+
+    Vertex checkerVertex = new Vertex("checker", new ProcessorDescriptor(
+        UnionProcessor.class.getName()), 1);
+
+    Configuration outputConf = new Configuration(tezConf);
+    DataSinkDescriptor od = MROutput.createConfigurer(outputConf,
+        TextOutputFormat.class, outputPath).create();
+    checkerVertex.addDataSink("union", od);
+    
+
+    Configuration allPartsConf = new Configuration(tezConf);
+    DataSinkDescriptor od2 = MROutput.createConfigurer(allPartsConf,
+        TextOutputFormat.class, outputPath + "-all-parts").create();
+    checkerVertex.addDataSink("all-parts", od2);
+
+    Configuration partsConf = new Configuration(tezConf);    
+    DataSinkDescriptor od1 = MROutput.createConfigurer(partsConf,
+        TextOutputFormat.class, outputPath + "-parts").create();
+    VertexGroup unionVertex = dag.createVertexGroup("union", mapVertex1, mapVertex2);
+    unionVertex.addDataSink("parts", od1);
+
+    OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
+        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
+            HashPartitioner.class.getName()).build();
+
+    dag.addVertex(mapVertex1)
+        .addVertex(mapVertex2)
+        .addVertex(mapVertex3)
+        .addVertex(checkerVertex)
+        .addEdge(
+            new Edge(mapVertex3, checkerVertex, edgeConf.createDefaultEdgeProperty()))
+        .addEdge(
+            new GroupInputEdge(unionVertex, checkerVertex, edgeConf.createDefaultEdgeProperty(),
+                new InputDescriptor(
+                    ConcatenatedMergedKeyValuesInput.class.getName())));
+    return dag;  
+  }
+
+  private static void printUsage() {
+    System.err.println("Usage: " + " unionexample <in1> <out1>");
+  }
+
+  public boolean run(String inputPath, String outputPath, Configuration conf) throws Exception {
+    System.out.println("Running UnionExample");
+    // conf and UGI
+    TezConfiguration tezConf;
+    if (conf != null) {
+      tezConf = new TezConfiguration(conf);
+    } else {
+      tezConf = new TezConfiguration();
+    }
+    UserGroupInformation.setConfiguration(tezConf);
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    // staging dir
+    FileSystem fs = FileSystem.get(tezConf);
+    String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
+        + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
+        + Path.SEPARATOR + Long.toString(System.currentTimeMillis());    
+    Path stagingDir = new Path(stagingDirStr);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
+    stagingDir = fs.makeQualified(stagingDir);
+    
+
+    // No need to add jar containing this class as assumed to be part of
+    // the tez jars.
+
+    // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
+    // is the same filesystem as the one used for Input/Output.
+    
+    TezClient tezSession = new TezClient("UnionExampleSession", tezConf);
+    tezSession.start();
+
+    DAGClient dagClient = null;
+
+    try {
+        if (fs.exists(new Path(outputPath))) {
+          throw new FileAlreadyExistsException("Output directory "
+              + outputPath + " already exists");
+        }
+        
+        Map<String, LocalResource> localResources =
+          new TreeMap<String, LocalResource>();
+        
+        DAG dag = createDAG(fs, tezConf, localResources,
+            stagingDir, inputPath, outputPath);
+
+        tezSession.waitTillReady();
+        dagClient = tezSession.submitDAG(dag);
+
+        // monitoring
+        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(EnumSet.of(StatusGetOpts.GET_COUNTERS));
+        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+          System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
+          return false;
+        }
+        return true;
+    } finally {
+      fs.delete(stagingDir, true);
+      tezSession.stop();
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length != 2) {
+      printUsage();
+      System.exit(2);
+    }
+    UnionExample job = new UnionExample();
+    job.run(args[0], args[1], null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
new file mode 100644
index 0000000..93ec860
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples.helpers;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+import com.google.common.base.Preconditions;
+
+public class SplitsInClientOptionParser {
+
+  private CommandLine cmdLine;
+  private String[] otherArgs;
+
+  private boolean parsed = false;
+
+  public SplitsInClientOptionParser() {
+
+  }
+
+  public String[] getRemainingArgs() {
+    Preconditions.checkState(parsed,
+        "Cannot get remaining args without parsing");
+    return otherArgs;
+  }
+
+  @SuppressWarnings("static-access")
+  public boolean parse(String[] args, boolean defaultVal) throws ParseException {
+    Preconditions.checkState(parsed == false,
+        "Craete a new instance for different option sets");
+    parsed = true;
+    Options opts = new Options();
+    Option opt = OptionBuilder
+        .withArgName("splits_in_client")
+        .hasArg()
+        .withDescription(
+            "specify whether splits should be generated in the client")
+        .create("generateSplitsInClient");
+    opts.addOption(opt);
+    CommandLineParser parser = new GnuParser();
+
+    cmdLine = parser.parse(opts, args, false);
+    if (cmdLine.hasOption("generateSplitsInClient")) {
+      defaultVal = Boolean.parseBoolean(cmdLine
+          .getOptionValue("generateSplitsInClient"));
+    }
+    otherArgs = cmdLine.getArgs();
+    return defaultVal;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
new file mode 100644
index 0000000..aa7b836
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.processor;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.mapreduce.examples.FilterLinesByWord;
+import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+
+public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
+
+  private static final Log LOG = LogFactory.getLog(FilterByWordInputProcessor.class);
+
+  private String filterWord;
+
+  public FilterByWordInputProcessor(ProcessorContext context) {
+    super(context);
+  }
+
+
+  @Override
+  public void initialize() throws Exception {
+    Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+    filterWord = conf.get(FilterLinesByWord.FILTER_PARAM_NAME);
+    if (filterWord == null) {
+      getContext().fatalError(null, "No filter word specified");
+    }
+  }
+
+  @Override
+  public void handleEvents(List<Event> processorEvents) {
+    throw new UnsupportedOperationException("Not expecting any events to the broadcast processor");
+
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Broadcast Processor closing. Nothing to do");
+  }
+
+  @Override
+  public void run(Map<String, LogicalInput> inputs,
+      Map<String, LogicalOutput> outputs) throws Exception {
+    
+    if (inputs.size() != 1) {
+      throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single input");
+    }
+
+    if (outputs.size() != 1) {
+      throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single output");
+    }
+    
+    for (LogicalInput input : inputs.values()) {
+      input.start();
+    }
+    for (LogicalOutput output : outputs.values()) {
+      output.start();
+    }
+
+    LogicalInput li = inputs.values().iterator().next();
+    if (! (li instanceof MRInput)) {
+      throw new IllegalStateException("FilterByWordInputProcessor processor can only work with MRInput");
+    }
+
+    LogicalOutput lo = outputs.values().iterator().next();
+    if (! (lo instanceof OnFileUnorderedKVOutput)) {
+      throw new IllegalStateException("FilterByWordInputProcessor processor can only work with OnFileUnorderedKVOutput");
+    }
+
+    MRInputLegacy mrInput = (MRInputLegacy) li;
+    mrInput.init();
+    OnFileUnorderedKVOutput kvOutput = (OnFileUnorderedKVOutput) lo;
+
+    Configuration updatedConf = mrInput.getConfigUpdates();
+    Text srcFile = new Text();
+    srcFile.set("UNKNOWN_FILENAME_IN_PROCESSOR");
+    if (updatedConf != null) {
+      String fileName = updatedConf.get(MRJobConfig.MAP_INPUT_FILE);
+      if (fileName != null) {
+        LOG.info("Processing file: " + fileName);
+        srcFile.set(fileName);
+      }
+    }
+
+    KeyValueReader kvReader = mrInput.getReader();
+    KeyValueWriter kvWriter = kvOutput.getWriter();
+
+    while (kvReader.next()) {
+      Object key = kvReader.getCurrentKey();
+      Object val = kvReader.getCurrentValue();
+
+      Text valText = (Text) val;
+      String readVal = valText.toString();
+      if (readVal.contains(filterWord)) {
+        LongWritable lineNum = (LongWritable) key;
+        TextLongPair outVal = new TextLongPair(srcFile, lineNum);
+        kvWriter.write(valText, outVal);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
new file mode 100644
index 0000000..9eca13f
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.processor;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+
+
+public class FilterByWordOutputProcessor extends SimpleMRProcessor {
+
+  private static final Log LOG = LogFactory.getLog(MapProcessor.class);
+
+  public FilterByWordOutputProcessor(ProcessorContext context) {
+    super(context);
+  }
+
+
+  @Override
+  public void handleEvents(List<Event> processorEvents) {
+    throw new UnsupportedOperationException("Not expecting any events to the broadcast output processor");
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Broadcast Output Processor closing. Nothing to do");
+  }
+
+  @Override
+  public void run() throws Exception {
+    
+    if (inputs.size() != 1) {
+      throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with a single input");
+    }
+
+    if (outputs.size() != 1) {
+      throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with a single output");
+    }
+
+    for (LogicalInput input : inputs.values()) {
+      input.start();
+    }
+    for (LogicalOutput output : outputs.values()) {
+      output.start();
+    }
+
+    LogicalInput li = inputs.values().iterator().next();
+    if (! (li instanceof ShuffledUnorderedKVInput)) {
+      throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with ShuffledUnorderedKVInput");
+    }
+
+    LogicalOutput lo = outputs.values().iterator().next();
+    if (! (lo instanceof MROutput)) {
+      throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with MROutput");
+    }
+
+    ShuffledUnorderedKVInput kvInput = (ShuffledUnorderedKVInput) li;
+    MROutput mrOutput = (MROutput) lo;
+
+    KeyValueReader kvReader = kvInput.getReader();
+    KeyValueWriter kvWriter = mrOutput.getWriter();
+    while (kvReader.next()) {
+      Object key = kvReader.getCurrentKey();
+      Object value = kvReader.getCurrentValue();
+
+      kvWriter.write(key, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java
new file mode 100644
index 0000000..c8517e5
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/GenSort.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.math.BigInteger;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.util.PureJavaCrc32;
+
+/** 
+ * A single process data generator for the terasort data. Based on gensort.c 
+ * version 1.1 (3 Mar 2009) from Chris Nyberg <chris.nyberg@ordinal.com>.
+ */
+public class GenSort {
+
+  /**
+   * Generate a "binary" record suitable for all sort benchmarks *except* 
+   * PennySort.
+   */
+  static void generateRecord(byte[] recBuf, Unsigned16 rand, 
+                                     Unsigned16 recordNumber) {
+    /* generate the 10-byte key using the high 10 bytes of the 128-bit
+     * random number
+     */
+    for(int i=0; i < 10; ++i) {
+      recBuf[i] = rand.getByte(i);
+    }
+
+    /* add 2 bytes of "break" */
+    recBuf[10] = 0x00;
+    recBuf[11] = 0x11;
+
+    /* convert the 128-bit record number to 32 bits of ascii hexadecimal
+     * as the next 32 bytes of the record.
+     */
+    for (int i = 0; i < 32; i++) {
+      recBuf[12 + i] = (byte) recordNumber.getHexDigit(i);
+    }
+
+    /* add 4 bytes of "break" data */
+    recBuf[44] = (byte) 0x88;
+    recBuf[45] = (byte) 0x99;
+    recBuf[46] = (byte) 0xAA;
+    recBuf[47] = (byte) 0xBB;
+
+    /* add 48 bytes of filler based on low 48 bits of random number */
+    for(int i=0; i < 12; ++i) {
+      recBuf[48+i*4] = recBuf[49+i*4] = recBuf[50+i*4] = recBuf[51+i*4] =
+        (byte) rand.getHexDigit(20 + i);
+    }
+
+    /* add 4 bytes of "break" data */
+    recBuf[96] = (byte) 0xCC;
+    recBuf[97] = (byte) 0xDD;
+    recBuf[98] = (byte) 0xEE;
+    recBuf[99] = (byte) 0xFF;
+  }
+
+
+  private static BigInteger makeBigInteger(long x) {
+    byte[] data = new byte[8];
+    for(int i=0; i < 8; ++i) {
+      data[i] = (byte) (x >>> (56 - 8*i));
+    }
+    return new BigInteger(1, data);
+  }
+
+  private static final BigInteger NINETY_FIVE = new BigInteger("95");
+
+  /**
+   * Generate an ascii record suitable for all sort benchmarks including 
+   * PennySort.
+   */
+  static void generateAsciiRecord(byte[] recBuf, Unsigned16 rand, 
+                                  Unsigned16 recordNumber) {
+
+    /* generate the 10-byte ascii key using mostly the high 64 bits.
+     */
+    long temp = rand.getHigh8();
+    if (temp < 0) {
+      // use biginteger to avoid the negative sign problem
+      BigInteger bigTemp = makeBigInteger(temp);
+      recBuf[0] = (byte) (' ' + (bigTemp.mod(NINETY_FIVE).longValue()));
+      temp = bigTemp.divide(NINETY_FIVE).longValue();
+    } else {
+      recBuf[0] = (byte) (' ' + (temp % 95));
+      temp /= 95;      
+    }
+    for(int i=1; i < 8; ++i) {
+      recBuf[i] = (byte) (' ' + (temp % 95));
+      temp /= 95;      
+    }
+    temp = rand.getLow8();
+    if (temp < 0) {
+      BigInteger bigTemp = makeBigInteger(temp);
+      recBuf[8] = (byte) (' ' + (bigTemp.mod(NINETY_FIVE).longValue()));
+      temp = bigTemp.divide(NINETY_FIVE).longValue();      
+    } else {
+      recBuf[8] = (byte) (' ' + (temp % 95));
+      temp /= 95;
+    }
+    recBuf[9] = (byte)(' ' + (temp % 95));
+
+    /* add 2 bytes of "break" */
+    recBuf[10] = ' ';
+    recBuf[11] = ' ';
+
+    /* convert the 128-bit record number to 32 bits of ascii hexadecimal
+     * as the next 32 bytes of the record.
+     */
+    for (int i = 0; i < 32; i++) {
+      recBuf[12 + i] = (byte) recordNumber.getHexDigit(i);
+    }
+
+    /* add 2 bytes of "break" data */
+    recBuf[44] = ' ';
+    recBuf[45] = ' ';
+
+    /* add 52 bytes of filler based on low 48 bits of random number */
+    for(int i=0; i < 13; ++i) {
+      recBuf[46+i*4] = recBuf[47+i*4] = recBuf[48+i*4] = recBuf[49+i*4] =
+        (byte) rand.getHexDigit(19 + i);
+    }
+
+    /* add 2 bytes of "break" data */
+    recBuf[98] = '\r';	/* nice for Windows */
+    recBuf[99] = '\n';
+}
+
+
+  private static void usage() {
+    PrintStream out = System.out;
+    out.println("usage: gensort [-a] [-c] [-bSTARTING_REC_NUM] NUM_RECS FILE_NAME");
+    out.println("-a        Generate ascii records required for PennySort or JouleSort.");
+    out.println("          These records are also an alternative input for the other");
+    out.println("          sort benchmarks.  Without this flag, binary records will be");
+    out.println("          generated that contain the highest density of randomness in");
+    out.println("          the 10-byte key.");
+    out.println( "-c        Calculate the sum of the crc32 checksums of each of the");
+    out.println("          generated records and send it to standard error.");
+    out.println("-bN       Set the beginning record generated to N. By default the");
+    out.println("          first record generated is record 0.");
+    out.println("NUM_RECS  The number of sequential records to generate.");
+    out.println("FILE_NAME The name of the file to write the records to.\n");
+    out.println("Example 1 - to generate 1000000 ascii records starting at record 0 to");
+    out.println("the file named \"pennyinput\":");
+    out.println("    gensort -a 1000000 pennyinput\n");
+    out.println("Example 2 - to generate 1000 binary records beginning with record 2000");
+    out.println("to the file named \"partition2\":");
+    out.println("    gensort -b2000 1000 partition2");
+    System.exit(1);
+  }
+
+
+  public static void outputRecords(OutputStream out,
+                                   boolean useAscii,
+                                   Unsigned16 firstRecordNumber,
+                                   Unsigned16 recordsToGenerate,
+                                   Unsigned16 checksum
+                                   ) throws IOException {
+    byte[] row = new byte[100];
+    Unsigned16 recordNumber = new Unsigned16(firstRecordNumber);
+    Unsigned16 lastRecordNumber = new Unsigned16(firstRecordNumber);
+    Checksum crc = new PureJavaCrc32();
+    Unsigned16 tmp = new Unsigned16();
+    lastRecordNumber.add(recordsToGenerate);
+    Unsigned16 ONE = new Unsigned16(1);
+    Unsigned16 rand = Random16.skipAhead(firstRecordNumber);
+    while (!recordNumber.equals(lastRecordNumber)) {
+      Random16.nextRand(rand);
+      if (useAscii) {
+        generateAsciiRecord(row, rand, recordNumber);
+      } else {
+        generateRecord(row, rand, recordNumber);
+      }
+      if (checksum != null) {
+        crc.reset();
+        crc.update(row, 0, row.length);
+        tmp.set(crc.getValue());
+        checksum.add(tmp);
+      }
+      recordNumber.add(ONE);
+      out.write(row);
+    }
+  }
+                                   
+  public static void main(String[] args) throws Exception {
+    Unsigned16 startingRecord = new Unsigned16();
+    Unsigned16 numberOfRecords;
+    OutputStream out;
+    boolean useAscii = false;
+    Unsigned16 checksum = null;
+
+    int i;
+    for(i=0; i < args.length; ++i) {
+      String arg = args[i];
+      int argLength = arg.length();
+      if (argLength >= 1 && arg.charAt(0) == '-') {
+        if (argLength < 2) {
+          usage();
+        }
+        switch (arg.charAt(1)) {
+        case 'a':
+          useAscii = true;
+          break;
+        case 'b':
+          startingRecord = Unsigned16.fromDecimal(arg.substring(2));
+          break;
+        case 'c':
+          checksum = new Unsigned16();
+          break;
+        default:
+          usage();
+        }
+      } else {
+        break;
+      }
+    }
+    if (args.length - i != 2) {
+      usage();
+    }
+    numberOfRecords = Unsigned16.fromDecimal(args[i]);
+    out = new FileOutputStream(args[i+1]);
+
+    outputRecords(out, useAscii, startingRecord, numberOfRecords, checksum);
+    out.close();
+    if (checksum != null) {
+      System.out.println(checksum);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java
new file mode 100644
index 0000000..31cbd48
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/Random16.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples.terasort;
+
+/**
+ * This class implements a 128-bit linear congruential generator.
+ * Specifically, if X0 is the most recently issued 128-bit random
+ * number (or a seed of 0 if no random number has already been generated,
+ * the next number to be generated, X1, is equal to:
+ * X1 = (a * X0 + c) mod 2**128
+ * where a is 47026247687942121848144207491837523525
+ *            or 0x2360ed051fc65da44385df649fccf645
+ *   and c is 98910279301475397889117759788405497857
+ *            or 0x4a696d47726179524950202020202001
+ * The coefficient "a" is suggested by:
+ * Pierre L'Ecuyer, "Tables of linear congruential generators of different
+ * sizes and good lattice structure", Mathematics of Computation, 68
+ * pp. 249 - 260 (1999)
+ * http://www.ams.org/mcom/1999-68-225/S0025-5718-99-00996-5/S0025-5718-99-00996-5.pdf
+ * The constant "c" meets the simple suggestion by the same reference that
+ * it be odd.
+ *
+ * There is also a facility for quickly advancing the state of the
+ * generator by a fixed number of steps - this facilitates parallel
+ * generation.
+ *
+ * This is based on 1.0 of rand16.c from Chris Nyberg 
+ * <chris.nyberg@ordinal.com>.
+ */
+class Random16 {
+
+  /** 
+   * The "Gen" array contain powers of 2 of the linear congruential generator.
+   * The index 0 struct contain the "a" coefficient and "c" constant for the
+   * generator.  That is, the generator is:
+   *    f(x) = (Gen[0].a * x + Gen[0].c) mod 2**128
+   *
+   * All structs after the first contain an "a" and "c" that
+   * comprise the square of the previous function.
+   *
+   * f**2(x) = (Gen[1].a * x + Gen[1].c) mod 2**128
+   * f**4(x) = (Gen[2].a * x + Gen[2].c) mod 2**128
+   * f**8(x) = (Gen[3].a * x + Gen[3].c) mod 2**128
+   * ...
+
+   */
+  private static class RandomConstant {
+    final Unsigned16 a;
+    final Unsigned16 c;
+    public RandomConstant(String left, String right) {
+      a = new Unsigned16(left);
+      c = new Unsigned16(right);
+    }
+  }
+
+  private static final RandomConstant[] genArray = new RandomConstant[]{
+    /* [  0] */ new RandomConstant("2360ed051fc65da44385df649fccf645", 
+                                   "4a696d47726179524950202020202001"),
+    /* [  1] */ new RandomConstant("17bce35bdf69743c529ed9eb20e0ae99", 
+                                   "95e0e48262b3edfe04479485c755b646"),
+    /* [  2] */ new RandomConstant("f4dd417327db7a9bd194dfbe42d45771", 
+                                   "882a02c315362b60765f100068b33a1c"),
+    /* [  3] */ new RandomConstant("6347af777a7898f6d1a2d6f33505ffe1", 
+                                   "5efc4abfaca23e8ca8edb1f2dfbf6478"),
+    /* [  4] */ new RandomConstant("b6a4239f3b315f84f6ef6d3d288c03c1", 
+                                   "f25bd15439d16af594c1b1bafa6239f0"),
+    /* [  5] */ new RandomConstant("2c82901ad1cb0cd182b631ba6b261781", 
+                                   "89ca67c29c9397d59c612596145db7e0"),
+    /* [  6] */ new RandomConstant("dab03f988288676ee49e66c4d2746f01", 
+                                   "8b6ae036713bd578a8093c8eae5c7fc0"),
+    /* [  7] */ new RandomConstant("602167331d86cf5684fe009a6d09de01", 
+                                   "98a2542fd23d0dbdff3b886cdb1d3f80"),
+    /* [  8] */ new RandomConstant("61ecb5c24d95b058f04c80a23697bc01", 
+                                   "954db923fdb7933e947cd1edcecb7f00"),
+    /* [  9] */ new RandomConstant("4a5c31e0654c28aa60474e83bf3f7801", 
+                                   "00be4a36657c98cd204e8c8af7dafe00"),
+    /* [ 10] */ new RandomConstant("ae4f079d54fbece1478331d3c6bef001", 
+                                   "991965329dccb28d581199ab18c5fc00"),
+    /* [ 11] */ new RandomConstant("101b8cb830c7cb927ff1ed50ae7de001", 
+                                   "e1a8705b63ad5b8cd6c3d268d5cbf800"),
+    /* [ 12] */ new RandomConstant("f54a27fc056b00e7563f3505e0fbc001", 
+                                   "2b657bbfd6ed9d632079e70c3c97f000"),
+    /* [ 13] */ new RandomConstant("df8a6fc1a833d201f98d719dd1f78001",
+                                   "59b60ee4c52fa49e9fe90682bd2fe000"),
+    /* [ 14] */ new RandomConstant("5480a5015f101a4ea7e3f183e3ef0001", 
+                                   "cc099c88030679464fe86aae8a5fc000"),
+    /* [ 15] */ new RandomConstant("a498509e76e5d7925f539c28c7de0001", 
+                                   "06b9abff9f9f33dd30362c0154bf8000"),
+    /* [ 16] */ new RandomConstant("0798a3d8b10dc72e60121cd58fbc0001", 
+                                   "e296707121688d5a0260b293a97f0000"),
+    /* [ 17] */ new RandomConstant("1647d1e78ec02e665fafcbbb1f780001", 
+                                   "189ffc4701ff23cb8f8acf6b52fe0000"),
+    /* [ 18] */ new RandomConstant("a7c982285e72bf8c0c8ddfb63ef00001", 
+                                   "5141110ab208fb9d61fb47e6a5fc0000"),
+    /* [ 19] */ new RandomConstant("3eb78ee8fb8c56dbc5d4e06c7de00001", 
+                                   "3c97caa62540f2948d8d340d4bf80000"),
+    /* [ 20] */ new RandomConstant("72d03b6f4681f2f9fe8e44d8fbc00001", 
+                                   "1b25cb9cfe5a0c963174f91a97f00000"),
+    /* [ 21] */ new RandomConstant("ea85f81e4f502c9bc8ae99b1f7800001", 
+                                   "0c644570b4a487103c5436352fe00000"),
+    /* [ 22] */ new RandomConstant("629c320db08b00c6bfa57363ef000001", 
+                                   "3d0589c28869472bde517c6a5fc00000"),
+    /* [ 23] */ new RandomConstant("c5c4b9ce268d074a386be6c7de000001", 
+                                   "bc95e5ab36477e65534738d4bf800000"),
+    /* [ 24] */ new RandomConstant("f30bbbbed1596187555bcd8fbc000001", 
+                                   "ddb02ff72a031c01011f71a97f000000"),
+    /* [ 25] */ new RandomConstant("4a1000fb26c9eeda3cc79b1f78000001", 
+                                   "2561426086d9acdb6c82e352fe000000"),
+    /* [ 26] */ new RandomConstant("89fb5307f6bf8ce2c1cf363ef0000001", 
+                                   "64a788e3c118ed1c8215c6a5fc000000"),
+    /* [ 27] */ new RandomConstant("830b7b3358a5d67ea49e6c7de0000001", 
+                                   "e65ea321908627cfa86b8d4bf8000000"),
+    /* [ 28] */ new RandomConstant("fd8a51da91a69fe1cd3cd8fbc0000001", 
+                                   "53d27225604d85f9e1d71a97f0000000"),
+    /* [ 29] */ new RandomConstant("901a48b642b90b55aa79b1f780000001", 
+                                   "ca5ec7a3ed1fe55e07ae352fe0000000"),
+    /* [ 30] */ new RandomConstant("118cdefdf32144f394f363ef00000001", 
+                                   "4daebb2e085330651f5c6a5fc0000000"),
+    /* [ 31] */ new RandomConstant("0a88c0a91cff430829e6c7de00000001", 
+                                   "9d6f1a00a8f3f76e7eb8d4bf80000000"),
+    /* [ 32] */ new RandomConstant("433bef4314f16a9453cd8fbc00000001", 
+                                   "158c62f2b31e496dfd71a97f00000000"),
+    /* [ 33] */ new RandomConstant("c294b02995ae6738a79b1f7800000001", 
+                                   "290e84a2eb15fd1ffae352fe00000000"),
+    /* [ 34] */ new RandomConstant("913575e0da8b16b14f363ef000000001", 
+                                   "e3dc1bfbe991a34ff5c6a5fc00000000"),
+    /* [ 35] */ new RandomConstant("2f61b9f871cf4e629e6c7de000000001", 
+                                   "ddf540d020b9eadfeb8d4bf800000000"),
+    /* [ 36] */ new RandomConstant("78d26ccbd68320c53cd8fbc000000001", 
+                                   "8ee4950177ce66bfd71a97f000000000"),
+    /* [ 37] */ new RandomConstant("8b7ebd037898518a79b1f78000000001", 
+                                   "39e0f787c907117fae352fe000000000"),
+    /* [ 38] */ new RandomConstant("0b5507b61f78e314f363ef0000000001", 
+                                   "659d2522f7b732ff5c6a5fc000000000"),
+    /* [ 39] */ new RandomConstant("4f884628f812c629e6c7de0000000001", 
+                                   "9e8722938612a5feb8d4bf8000000000"),
+    /* [ 40] */ new RandomConstant("be896744d4a98c53cd8fbc0000000001", 
+                                   "e941a65d66b64bfd71a97f0000000000"),
+    /* [ 41] */ new RandomConstant("daf63a553b6318a79b1f780000000001", 
+                                   "7b50d19437b097fae352fe0000000000"),
+    /* [ 42] */ new RandomConstant("2d7a23d8bf06314f363ef00000000001", 
+                                   "59d7b68e18712ff5c6a5fc0000000000"),
+    /* [ 43] */ new RandomConstant("392b046a9f0c629e6c7de00000000001", 
+                                   "4087bab2d5225feb8d4bf80000000000"),
+    /* [ 44] */ new RandomConstant("eb30fbb9c218c53cd8fbc00000000001", 
+                                   "b470abc03b44bfd71a97f00000000000"),
+    /* [ 45] */ new RandomConstant("b9cdc30594318a79b1f7800000000001", 
+                                   "366630eaba897fae352fe00000000000"),
+    /* [ 46] */ new RandomConstant("014ab453686314f363ef000000000001", 
+                                   "a2dfc77e8512ff5c6a5fc00000000000"),
+    /* [ 47] */ new RandomConstant("395221c7d0c629e6c7de000000000001", 
+                                   "1e0d25a14a25feb8d4bf800000000000"),
+    /* [ 48] */ new RandomConstant("4d972813a18c53cd8fbc000000000001", 
+                                   "9d50a5d3944bfd71a97f000000000000"),
+    /* [ 49] */ new RandomConstant("06f9e2374318a79b1f78000000000001", 
+                                   "bf7ab5eb2897fae352fe000000000000"),
+    /* [ 50] */ new RandomConstant("bd220cae86314f363ef0000000000001", 
+                                   "925b14e6512ff5c6a5fc000000000000"),
+    /* [ 51] */ new RandomConstant("36fd3a5d0c629e6c7de0000000000001", 
+                                   "724cce0ca25feb8d4bf8000000000000"),
+    /* [ 52] */ new RandomConstant("60def8ba18c53cd8fbc0000000000001", 
+                                   "1af42d1944bfd71a97f0000000000000"),
+    /* [ 53] */ new RandomConstant("8d500174318a79b1f780000000000001", 
+                                   "0f529e32897fae352fe0000000000000"),
+    /* [ 54] */ new RandomConstant("48e842e86314f363ef00000000000001", 
+                                   "844e4c6512ff5c6a5fc0000000000000"),
+    /* [ 55] */ new RandomConstant("4af185d0c629e6c7de00000000000001",
+                                   "9f40d8ca25feb8d4bf80000000000000"),
+    /* [ 56] */ new RandomConstant("7a670ba18c53cd8fbc00000000000001",
+                                   "9912b1944bfd71a97f00000000000000"),
+    /* [ 57] */ new RandomConstant("86de174318a79b1f7800000000000001",
+                                   "9c69632897fae352fe00000000000000"),
+    /* [ 58] */ new RandomConstant("55fc2e86314f363ef000000000000001",
+                                   "e1e2c6512ff5c6a5fc00000000000000"),
+    /* [ 59] */ new RandomConstant("ccf85d0c629e6c7de000000000000001",
+                                   "68058ca25feb8d4bf800000000000000"),
+    /* [ 60] */ new RandomConstant("1df0ba18c53cd8fbc000000000000001",
+                                   "610b1944bfd71a97f000000000000000"),
+    /* [ 61] */ new RandomConstant("4be174318a79b1f78000000000000001",
+                                   "061632897fae352fe000000000000000"),
+    /* [ 62] */ new RandomConstant("d7c2e86314f363ef0000000000000001",
+                                   "1c2c6512ff5c6a5fc000000000000000"),
+    /* [ 63] */ new RandomConstant("af85d0c629e6c7de0000000000000001",
+                                   "7858ca25feb8d4bf8000000000000000"),
+    /* [ 64] */ new RandomConstant("5f0ba18c53cd8fbc0000000000000001",
+                                   "f0b1944bfd71a97f0000000000000000"),
+    /* [ 65] */ new RandomConstant("be174318a79b1f780000000000000001",
+                                   "e1632897fae352fe0000000000000000"),
+    /* [ 66] */ new RandomConstant("7c2e86314f363ef00000000000000001",
+                                   "c2c6512ff5c6a5fc0000000000000000"),
+    /* [ 67] */ new RandomConstant("f85d0c629e6c7de00000000000000001",
+                                   "858ca25feb8d4bf80000000000000000"),
+    /* [ 68] */ new RandomConstant("f0ba18c53cd8fbc00000000000000001",
+                                   "0b1944bfd71a97f00000000000000000"),
+    /* [ 69] */ new RandomConstant("e174318a79b1f7800000000000000001",
+                                   "1632897fae352fe00000000000000000"),
+    /* [ 70] */ new RandomConstant("c2e86314f363ef000000000000000001",
+                                   "2c6512ff5c6a5fc00000000000000000"),
+    /* [ 71] */ new RandomConstant("85d0c629e6c7de000000000000000001",
+                                   "58ca25feb8d4bf800000000000000000"),
+    /* [ 72] */ new RandomConstant("0ba18c53cd8fbc000000000000000001",
+                                   "b1944bfd71a97f000000000000000000"),
+    /* [ 73] */ new RandomConstant("174318a79b1f78000000000000000001",
+                                   "632897fae352fe000000000000000000"),
+    /* [ 74] */ new RandomConstant("2e86314f363ef0000000000000000001",
+                                   "c6512ff5c6a5fc000000000000000000"),
+    /* [ 75] */ new RandomConstant("5d0c629e6c7de0000000000000000001",
+                                   "8ca25feb8d4bf8000000000000000000"),
+    /* [ 76] */ new RandomConstant("ba18c53cd8fbc0000000000000000001",
+                                   "1944bfd71a97f0000000000000000000"),
+    /* [ 77] */ new RandomConstant("74318a79b1f780000000000000000001",
+                                   "32897fae352fe0000000000000000000"),
+    /* [ 78] */ new RandomConstant("e86314f363ef00000000000000000001",
+                                   "6512ff5c6a5fc0000000000000000000"),
+    /* [ 79] */ new RandomConstant("d0c629e6c7de00000000000000000001",
+                                   "ca25feb8d4bf80000000000000000000"),
+    /* [ 80] */ new RandomConstant("a18c53cd8fbc00000000000000000001",
+                                   "944bfd71a97f00000000000000000000"),
+    /* [ 81] */ new RandomConstant("4318a79b1f7800000000000000000001",
+                                   "2897fae352fe00000000000000000000"),
+    /* [ 82] */ new RandomConstant("86314f363ef000000000000000000001",
+                                   "512ff5c6a5fc00000000000000000000"),
+    /* [ 83] */ new RandomConstant("0c629e6c7de000000000000000000001",
+                                   "a25feb8d4bf800000000000000000000"),
+    /* [ 84] */ new RandomConstant("18c53cd8fbc000000000000000000001",
+                                   "44bfd71a97f000000000000000000000"),
+    /* [ 85] */ new RandomConstant("318a79b1f78000000000000000000001",
+                                   "897fae352fe000000000000000000000"),
+    /* [ 86] */ new RandomConstant("6314f363ef0000000000000000000001",
+                                   "12ff5c6a5fc000000000000000000000"),
+    /* [ 87] */ new RandomConstant("c629e6c7de0000000000000000000001",
+                                   "25feb8d4bf8000000000000000000000"),
+    /* [ 88] */ new RandomConstant("8c53cd8fbc0000000000000000000001",
+                                   "4bfd71a97f0000000000000000000000"),
+    /* [ 89] */ new RandomConstant("18a79b1f780000000000000000000001",
+                                   "97fae352fe0000000000000000000000"),
+    /* [ 90] */ new RandomConstant("314f363ef00000000000000000000001",
+                                   "2ff5c6a5fc0000000000000000000000"),
+    /* [ 91] */ new RandomConstant("629e6c7de00000000000000000000001",
+                                   "5feb8d4bf80000000000000000000000"),
+    /* [ 92] */ new RandomConstant("c53cd8fbc00000000000000000000001",
+                                   "bfd71a97f00000000000000000000000"),
+    /* [ 93] */ new RandomConstant("8a79b1f7800000000000000000000001",
+                                   "7fae352fe00000000000000000000000"),
+    /* [ 94] */ new RandomConstant("14f363ef000000000000000000000001",
+                                   "ff5c6a5fc00000000000000000000000"),
+    /* [ 95] */ new RandomConstant("29e6c7de000000000000000000000001",
+                                   "feb8d4bf800000000000000000000000"),
+    /* [ 96] */ new RandomConstant("53cd8fbc000000000000000000000001",
+                                   "fd71a97f000000000000000000000000"),
+    /* [ 97] */ new RandomConstant("a79b1f78000000000000000000000001",
+                                   "fae352fe000000000000000000000000"),
+    /* [ 98] */ new RandomConstant("4f363ef0000000000000000000000001",
+                                   "f5c6a5fc000000000000000000000000"),
+    /* [ 99] */ new RandomConstant("9e6c7de0000000000000000000000001",
+                                   "eb8d4bf8000000000000000000000000"),
+    /* [100] */ new RandomConstant("3cd8fbc0000000000000000000000001",
+                                   "d71a97f0000000000000000000000000"),
+    /* [101] */ new RandomConstant("79b1f780000000000000000000000001",
+                                   "ae352fe0000000000000000000000000"),
+    /* [102] */ new RandomConstant("f363ef00000000000000000000000001",
+                                   "5c6a5fc0000000000000000000000000"),
+    /* [103] */ new RandomConstant("e6c7de00000000000000000000000001",
+                                   "b8d4bf80000000000000000000000000"),
+    /* [104] */ new RandomConstant("cd8fbc00000000000000000000000001",
+                                   "71a97f00000000000000000000000000"),
+    /* [105] */ new RandomConstant("9b1f7800000000000000000000000001",
+                                   "e352fe00000000000000000000000000"),
+    /* [106] */ new RandomConstant("363ef000000000000000000000000001",
+                                   "c6a5fc00000000000000000000000000"),
+    /* [107] */ new RandomConstant("6c7de000000000000000000000000001",
+                                   "8d4bf800000000000000000000000000"),
+    /* [108] */ new RandomConstant("d8fbc000000000000000000000000001",
+                                   "1a97f000000000000000000000000000"),
+    /* [109] */ new RandomConstant("b1f78000000000000000000000000001",
+                                   "352fe000000000000000000000000000"),
+    /* [110] */ new RandomConstant("63ef0000000000000000000000000001",
+                                   "6a5fc000000000000000000000000000"),
+    /* [111] */ new RandomConstant("c7de0000000000000000000000000001",
+                                   "d4bf8000000000000000000000000000"),
+    /* [112] */ new RandomConstant("8fbc0000000000000000000000000001",
+                                   "a97f0000000000000000000000000000"),
+    /* [113] */ new RandomConstant("1f780000000000000000000000000001",
+                                   "52fe0000000000000000000000000000"),
+    /* [114] */ new RandomConstant("3ef00000000000000000000000000001",
+                                   "a5fc0000000000000000000000000000"),
+    /* [115] */ new RandomConstant("7de00000000000000000000000000001",
+                                   "4bf80000000000000000000000000000"),
+    /* [116] */ new RandomConstant("fbc00000000000000000000000000001",
+                                   "97f00000000000000000000000000000"),
+    /* [117] */ new RandomConstant("f7800000000000000000000000000001",
+                                   "2fe00000000000000000000000000000"),
+    /* [118] */ new RandomConstant("ef000000000000000000000000000001",
+                                   "5fc00000000000000000000000000000"),
+    /* [119] */ new RandomConstant("de000000000000000000000000000001",
+                                   "bf800000000000000000000000000000"),
+    /* [120] */ new RandomConstant("bc000000000000000000000000000001",
+                                   "7f000000000000000000000000000000"),
+    /* [121] */ new RandomConstant("78000000000000000000000000000001",
+                                   "fe000000000000000000000000000000"),
+    /* [122] */ new RandomConstant("f0000000000000000000000000000001",
+                                   "fc000000000000000000000000000000"),
+    /* [123] */ new RandomConstant("e0000000000000000000000000000001",
+                                   "f8000000000000000000000000000000"),
+    /* [124] */ new RandomConstant("c0000000000000000000000000000001",
+                                   "f0000000000000000000000000000000"),
+    /* [125] */ new RandomConstant("80000000000000000000000000000001",
+                                   "e0000000000000000000000000000000"),
+    /* [126] */ new RandomConstant("00000000000000000000000000000001",
+                                   "c0000000000000000000000000000000"),
+    /* [127] */ new RandomConstant("00000000000000000000000000000001",
+                                   "80000000000000000000000000000000")};
+
+  /**
+   *  generate the random number that is "advance" steps
+   *  from an initial random number of 0.  This is done by
+   *  starting with 0, and then advancing the by the
+   *  appropriate powers of 2 of the linear congruential
+   *  generator.
+   */
+  public static Unsigned16 skipAhead(Unsigned16 advance) {
+    Unsigned16 result = new Unsigned16();
+    long          bit_map;
+
+    bit_map = advance.getLow8();
+    for (int i = 0; bit_map != 0 && i < 64; i++) {
+      if ((bit_map & (1L << i)) != 0) {
+        /* advance random number by f**(2**i) (x)
+         */
+        result.multiply(genArray[i].a);
+        result.add(genArray[i].c);
+        bit_map &= ~(1L << i);
+      }
+    }
+    bit_map = advance.getHigh8();
+    for (int i = 0; bit_map != 0 && i < 64; i++)
+    {
+      if ((bit_map & (1L << i)) != 0) {
+        /* advance random number by f**(2**(i + 64)) (x)
+         */
+        result.multiply(genArray[i+64].a);
+        result.add(genArray[i+64].c);
+        bit_map &= ~(1L << i);
+      }
+    }
+    return result;
+  }
+
+  /** 
+   * Generate the next 16 byte random number.
+   */
+  public static void nextRand(Unsigned16 rand) {
+    /* advance the random number forward once using the linear congruential
+     * generator, and then return the new random number
+     */
+    rand.multiply(genArray[0].a);
+    rand.add(genArray[0].c);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/41f5cd8a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java
new file mode 100644
index 0000000..a5b408b
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraChecksum.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.IOException;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class TeraChecksum extends Configured implements Tool {
+  static class ChecksumMapper 
+      extends Mapper<Text, Text, NullWritable, Unsigned16> {
+    private Unsigned16 checksum = new Unsigned16();
+    private Unsigned16 sum = new Unsigned16();
+    private Checksum crc32 = new PureJavaCrc32();
+
+    public void map(Text key, Text value, 
+                    Context context) throws IOException {
+      crc32.reset();
+      crc32.update(key.getBytes(), 0, key.getLength());
+      crc32.update(value.getBytes(), 0, value.getLength());
+      checksum.set(crc32.getValue());
+      sum.add(checksum);
+    }
+
+    public void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      context.write(NullWritable.get(), sum);
+    }
+  }
+
+  static class ChecksumReducer 
+      extends Reducer<NullWritable, Unsigned16, NullWritable, Unsigned16> {
+
+    public void reduce(NullWritable key, Iterable<Unsigned16> values,
+        Context context) throws IOException, InterruptedException  {
+      Unsigned16 sum = new Unsigned16();
+      for (Unsigned16 val : values) {
+        sum.add(val);
+      }
+      context.write(key, sum);
+    }
+  }
+
+  private static void usage() throws IOException {
+    System.err.println("terasum <out-dir> <report-dir>");
+  }
+
+  public int run(String[] args) throws Exception {
+    Job job = Job.getInstance(getConf());
+    if (args.length != 2) {
+      usage();
+      return 2;
+    }
+    TeraInputFormat.setInputPaths(job, new Path(args[0]));
+    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    job.setJobName("TeraSum");
+    job.setJarByClass(TeraChecksum.class);
+    job.setMapperClass(ChecksumMapper.class);
+    job.setReducerClass(ChecksumReducer.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(Unsigned16.class);
+    // force a single reducer
+    job.setNumReduceTasks(1);
+    job.setInputFormatClass(TeraInputFormat.class);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new TeraChecksum(), args);
+    System.exit(res);
+  }
+
+}


Mime
View raw message