tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject svn commit: r1469642 [30/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...
Date Thu, 18 Apr 2013 23:54:28 GMT
Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraScheduler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraScheduler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraScheduler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.tez.mapreduce.examples.terasort.TeraInputFormat.TeraFileSplit;
+
+import com.google.common.base.Charsets;
+
+class TeraScheduler {
+  static String USE = "mapreduce.terasort.use.terascheduler";
+  private static final Log LOG = LogFactory.getLog(TeraScheduler.class);
+  private Split[] splits;
+  private List<Host> hosts = new ArrayList<Host>();
+  private int slotsPerHost;
+  private int remainingSplits = 0;
+  private FileSplit[] realSplits = null;
+
+  static class Split {
+    String filename;
+    boolean isAssigned = false;
+    List<Host> locations = new ArrayList<Host>();
+    Split(String filename) {
+      this.filename = filename;
+    }
+    public String toString() {
+      StringBuffer result = new StringBuffer();
+      result.append(filename);
+      result.append(" on ");
+      for(Host host: locations) {
+        result.append(host.hostname);
+        result.append(", ");
+      }
+      return result.toString();
+    }
+  }
+  static class Host {
+    String hostname;
+    List<Split> splits = new ArrayList<Split>();
+    Host(String hostname) {
+      this.hostname = hostname;
+    }
+    public String toString() {
+      StringBuffer result = new StringBuffer();
+      result.append(splits.size());
+      result.append(" ");
+      result.append(hostname);
+      return result.toString();
+    }
+  }
+
+  List<String> readFile(String filename) throws IOException {
+    List<String> result = new ArrayList<String>(10000);
+    BufferedReader in = new BufferedReader(
+        new InputStreamReader(new FileInputStream(filename), Charsets.UTF_8));
+    String line = in.readLine();
+    while (line != null) {
+      result.add(line);
+      line = in.readLine();
+    }
+    in.close();
+    return result;
+  }
+
+  public TeraScheduler(String splitFilename, 
+                       String nodeFilename) throws IOException {
+    slotsPerHost = 4;
+    // get the hosts
+    Map<String, Host> hostIds = new HashMap<String,Host>();
+    for(String hostName: readFile(nodeFilename)) {
+      Host host = new Host(hostName);
+      hosts.add(host);
+      hostIds.put(hostName, host);
+    }
+    // read the blocks
+    List<String> splitLines = readFile(splitFilename);
+    splits = new Split[splitLines.size()];
+    remainingSplits = 0;
+    for(String line: splitLines) {
+      StringTokenizer itr = new StringTokenizer(line);
+      Split newSplit = new Split(itr.nextToken());
+      splits[remainingSplits++] = newSplit;
+      while (itr.hasMoreTokens()) {
+        Host host = hostIds.get(itr.nextToken());
+        newSplit.locations.add(host);
+        host.splits.add(newSplit);
+      }
+    }
+  }
+
+  public TeraScheduler(FileSplit[] realSplits,
+                       Configuration conf) throws IOException {
+    this.realSplits = realSplits;
+    this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
+    Map<String, Host> hostTable = new HashMap<String, Host>();
+    splits = new Split[realSplits.length];
+    for(FileSplit realSplit: realSplits) {
+      Split split = new Split(realSplit.getPath().toString());
+      splits[remainingSplits++] = split;
+      for(String hostname: realSplit.getLocations()) {
+        Host host = hostTable.get(hostname);
+        if (host == null) {
+          host = new Host(hostname);
+          hostTable.put(hostname, host);
+          hosts.add(host);
+        }
+        host.splits.add(split);
+        split.locations.add(host);
+      }
+    }
+  }
+
+  Host pickBestHost() {
+    Host result = null;
+    int splits = Integer.MAX_VALUE;
+    for(Host host: hosts) {
+      if (host.splits.size() < splits) {
+        result = host;
+        splits = host.splits.size();
+      }
+    }
+    if (result != null) {
+      hosts.remove(result);
+      LOG.debug("picking " + result);
+    }
+    return result;
+  }
+
+  void pickBestSplits(Host host) {
+    int tasksToPick = Math.min(slotsPerHost, 
+                               (int) Math.ceil((double) remainingSplits / 
+                                               hosts.size()));
+    Split[] best = new Split[tasksToPick];
+    for(Split cur: host.splits) {
+      LOG.debug("  examine: " + cur.filename + " " + cur.locations.size());
+      int i = 0;
+      while (i < tasksToPick && best[i] != null && 
+             best[i].locations.size() <= cur.locations.size()) {
+        i += 1;
+      }
+      if (i < tasksToPick) {
+        for(int j = tasksToPick - 1; j > i; --j) {
+          best[j] = best[j-1];
+        }
+        best[i] = cur;
+      }
+    }
+    // for the chosen blocks, remove them from the other locations
+    for(int i=0; i < tasksToPick; ++i) {
+      if (best[i] != null) {
+        LOG.debug(" best: " + best[i].filename);
+        for (Host other: best[i].locations) {
+          other.splits.remove(best[i]);
+        }
+        best[i].locations.clear();
+        best[i].locations.add(host);
+        best[i].isAssigned = true;
+        remainingSplits -= 1;
+      }
+    }
+    // for the non-chosen blocks, remove this host
+    for(Split cur: host.splits) {
+      if (!cur.isAssigned) {
+        cur.locations.remove(host);
+      }
+    }
+  }
+  
+  void solve() throws IOException {
+    Host host = pickBestHost();
+    while (host != null) {
+      pickBestSplits(host);
+      host = pickBestHost();
+    }
+  }
+
+  /**
+   * Solve the schedule and modify the FileSplit array to reflect the new
+   * schedule. It will move placed splits to front and unplacable splits
+   * to the end.
+   * @return a new list of FileSplits that are modified to have the
+   *    best host as the only host.
+   * @throws IOException
+   */
+  public List<InputSplit> getNewFileSplits() throws IOException {
+    solve();
+    FileSplit[] result = new FileSplit[realSplits.length];
+    int left = 0;
+    int right = realSplits.length - 1;
+    for(int i=0; i < splits.length; ++i) {
+      if (splits[i].isAssigned) {
+        // copy the split and fix up the locations
+        ((TeraFileSplit) realSplits[i]).setLocations
+           (new String[]{splits[i].locations.get(0).hostname});
+        result[left++] = realSplits[i];
+      } else {
+        result[right--] = realSplits[i];
+      }
+    }
+    List<InputSplit> ret = new ArrayList<InputSplit>();
+    for (FileSplit fs : result) {
+      ret.add(fs);
+    }
+    return ret;
+  }
+
+  public static void main(String[] args) throws IOException {
+    TeraScheduler problem = new TeraScheduler("block-loc.txt", "nodes");
+    for(Host host: problem.hosts) {
+      System.out.println(host);
+    }
+    LOG.info("starting solve");
+    problem.solve();
+    List<Split> leftOvers = new ArrayList<Split>();
+    for(int i=0; i < problem.splits.length; ++i) {
+      if (problem.splits[i].isAssigned) {
+        System.out.println("sched: " + problem.splits[i]);        
+      } else {
+        leftOvers.add(problem.splits[i]);
+      }
+    }
+    for(Split cur: leftOvers) {
+      System.out.println("left: " + cur);
+    }
+    System.out.println("left over: " + leftOvers.size());
+    LOG.info("done");
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraSort.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraSort.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraSort.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraSort.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Generates the sampled split points, launches the job, and waits for it to
+ * finish. 
+ * <p>
+ * To run the program: 
+ * <b>bin/hadoop jar hadoop-*-examples.jar terasort in-dir out-dir</b>
+ */
+public class TeraSort extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(TeraSort.class);
+  static String SIMPLE_PARTITIONER = "mapreduce.terasort.simplepartitioner";
+  static String OUTPUT_REPLICATION = "mapreduce.terasort.output.replication";
+
+  /**
+   * A partitioner that splits text keys into roughly equal partitions
+   * in a global sorted order.
+   */
+  static class TotalOrderPartitioner extends Partitioner<Text,Text>
+      implements Configurable {
+    private TrieNode trie;
+    private Text[] splitPoints;
+    private Configuration conf;
+
+    /**
+     * A generic trie node
+     */
+    static abstract class TrieNode {
+      private int level;
+      TrieNode(int level) {
+        this.level = level;
+      }
+      abstract int findPartition(Text key);
+      abstract void print(PrintStream strm) throws IOException;
+      int getLevel() {
+        return level;
+      }
+    }
+
+    /**
+     * An inner trie node that contains 256 children based on the next
+     * character.
+     */
+    static class InnerTrieNode extends TrieNode {
+      private TrieNode[] child = new TrieNode[256];
+      
+      InnerTrieNode(int level) {
+        super(level);
+      }
+      int findPartition(Text key) {
+        int level = getLevel();
+        if (key.getLength() <= level) {
+          return child[0].findPartition(key);
+        }
+        return child[key.getBytes()[level] & 0xff].findPartition(key);
+      }
+      void setChild(int idx, TrieNode child) {
+        this.child[idx] = child;
+      }
+      void print(PrintStream strm) throws IOException {
+        for(int ch=0; ch < 256; ++ch) {
+          for(int i = 0; i < 2*getLevel(); ++i) {
+            strm.print(' ');
+          }
+          strm.print(ch);
+          strm.println(" ->");
+          if (child[ch] != null) {
+            child[ch].print(strm);
+          }
+        }
+      }
+    }
+
+    /**
+     * A leaf trie node that does string compares to figure out where the given
+     * key belongs between lower..upper.
+     */
+    static class LeafTrieNode extends TrieNode {
+      int lower;
+      int upper;
+      Text[] splitPoints;
+      LeafTrieNode(int level, Text[] splitPoints, int lower, int upper) {
+        super(level);
+        this.splitPoints = splitPoints;
+        this.lower = lower;
+        this.upper = upper;
+      }
+      int findPartition(Text key) {
+        for(int i=lower; i<upper; ++i) {
+          if (splitPoints[i].compareTo(key) > 0) {
+            return i;
+          }
+        }
+        return upper;
+      }
+      void print(PrintStream strm) throws IOException {
+        for(int i = 0; i < 2*getLevel(); ++i) {
+          strm.print(' ');
+        }
+        strm.print(lower);
+        strm.print(", ");
+        strm.println(upper);
+      }
+    }
+
+
+    /**
+     * Read the cut points from the given sequence file.
+     * @param fs the file system
+     * @param p the path to read
+     * @param job the job config
+     * @return the strings to split the partitions on
+     * @throws IOException
+     */
+    private static Text[] readPartitions(FileSystem fs, Path p,
+        Configuration conf) throws IOException {
+      int reduces = conf.getInt(MRJobConfig.NUM_REDUCES, 1);
+      Text[] result = new Text[reduces - 1];
+      DataInputStream reader = fs.open(p);
+      for(int i=0; i < reduces - 1; ++i) {
+        result[i] = new Text();
+        result[i].readFields(reader);
+      }
+      reader.close();
+      return result;
+    }
+
+    /**
+     * Given a sorted set of cut points, build a trie that will find the correct
+     * partition quickly.
+     * @param splits the list of cut points
+     * @param lower the lower bound of partitions 0..numPartitions-1
+     * @param upper the upper bound of partitions 0..numPartitions-1
+     * @param prefix the prefix that we have already checked against
+     * @param maxDepth the maximum depth we will build a trie for
+     * @return the trie node that will divide the splits correctly
+     */
+    private static TrieNode buildTrie(Text[] splits, int lower, int upper, 
+                                      Text prefix, int maxDepth) {
+      int depth = prefix.getLength();
+      if (depth >= maxDepth || lower == upper) {
+        return new LeafTrieNode(depth, splits, lower, upper);
+      }
+      InnerTrieNode result = new InnerTrieNode(depth);
+      Text trial = new Text(prefix);
+      // append an extra byte on to the prefix
+      trial.append(new byte[1], 0, 1);
+      int currentBound = lower;
+      for(int ch = 0; ch < 255; ++ch) {
+        trial.getBytes()[depth] = (byte) (ch + 1);
+        lower = currentBound;
+        while (currentBound < upper) {
+          if (splits[currentBound].compareTo(trial) >= 0) {
+            break;
+          }
+          currentBound += 1;
+        }
+        trial.getBytes()[depth] = (byte) ch;
+        result.child[ch] = buildTrie(splits, lower, currentBound, trial, 
+                                     maxDepth);
+      }
+      // pick up the rest
+      trial.getBytes()[depth] = (byte) 255;
+      result.child[255] = buildTrie(splits, currentBound, upper, trial,
+                                    maxDepth);
+      return result;
+    }
+
+    public void setConf(Configuration conf) {
+      try {
+        FileSystem fs = FileSystem.getLocal(conf);
+        this.conf = conf;
+        Path partFile = new Path(TeraInputFormat.PARTITION_FILENAME);
+        splitPoints = readPartitions(fs, partFile, conf);
+        trie = buildTrie(splitPoints, 0, splitPoints.length, new Text(), 2);
+      } catch (IOException ie) {
+        throw new IllegalArgumentException("can't read paritions file", ie);
+      }
+    }
+
+    public Configuration getConf() {
+      return conf;
+    }
+    
+    public TotalOrderPartitioner() {
+    }
+
+    public int getPartition(Text key, Text value, int numPartitions) {
+      return trie.findPartition(key);
+    }
+    
+  }
+  
+  /**
+   * A total order partitioner that assigns keys based on their first 
+   * PREFIX_LENGTH bytes, assuming a flat distribution.
+   */
+  public static class SimplePartitioner extends Partitioner<Text, Text>
+      implements Configurable {
+    int prefixesPerReduce;
+    private static final int PREFIX_LENGTH = 3;
+    private Configuration conf = null;
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      prefixesPerReduce = (int) Math.ceil((1 << (8 * PREFIX_LENGTH)) / 
+        (float) conf.getInt(MRJobConfig.NUM_REDUCES, 1));
+    }
+    
+    public Configuration getConf() {
+      return conf;
+    }
+    
+    @Override
+    public int getPartition(Text key, Text value, int numPartitions) {
+      byte[] bytes = key.getBytes();
+      int len = Math.min(PREFIX_LENGTH, key.getLength());
+      int prefix = 0;
+      for(int i=0; i < len; ++i) {
+        prefix = (prefix << 8) | (0xff & bytes[i]);
+      }
+      return prefix / prefixesPerReduce;
+    }
+  }
+
+  public static boolean getUseSimplePartitioner(JobContext job) {
+    return job.getConfiguration().getBoolean(SIMPLE_PARTITIONER, false);
+  }
+
+  public static void setUseSimplePartitioner(Job job, boolean value) {
+    job.getConfiguration().setBoolean(SIMPLE_PARTITIONER, value);
+  }
+
+  public static int getOutputReplication(JobContext job) {
+    return job.getConfiguration().getInt(OUTPUT_REPLICATION, 1);
+  }
+
+  public static void setOutputReplication(Job job, int value) {
+    job.getConfiguration().setInt(OUTPUT_REPLICATION, value);
+  }
+
+  public int run(String[] args) throws Exception {
+    LOG.info("starting");
+    Job job = Job.getInstance(getConf());
+    Path inputDir = new Path(args[0]);
+    Path outputDir = new Path(args[1]);
+    boolean useSimplePartitioner = getUseSimplePartitioner(job);
+    TeraInputFormat.setInputPaths(job, inputDir);
+    FileOutputFormat.setOutputPath(job, outputDir);
+    job.setJobName("TeraSort");
+    job.setJarByClass(TeraSort.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setInputFormatClass(TeraInputFormat.class);
+    job.setOutputFormatClass(TeraOutputFormat.class);
+    if (useSimplePartitioner) {
+      job.setPartitionerClass(SimplePartitioner.class);
+    } else {
+      long start = System.currentTimeMillis();
+      Path partitionFile = new Path(outputDir, 
+                                    TeraInputFormat.PARTITION_FILENAME);
+      URI partitionUri = new URI(partitionFile.toString() +
+                                 "#" + TeraInputFormat.PARTITION_FILENAME);
+      try {
+        TeraInputFormat.writePartitionFile(job, partitionFile);
+      } catch (Throwable e) {
+        LOG.error(e.getMessage());
+        return -1;
+      }
+      job.addCacheFile(partitionUri);  
+      long end = System.currentTimeMillis();
+      System.out.println("Spent " + (end - start) + "ms computing partitions.");
+      job.setPartitionerClass(TotalOrderPartitioner.class);
+    }
+    
+    job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
+    TeraOutputFormat.setFinalSync(job, true);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
+    LOG.info("done");
+    return ret;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new TeraSort(), args);
+    System.exit(res);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraSort.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraValidate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraValidate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraValidate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraValidate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.IOException;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Generate 1 mapper per a file that checks to make sure the keys
+ * are sorted within each file. The mapper also generates 
+ * "$file:begin", first key and "$file:end", last key. The reduce verifies that
+ * all of the start/end items are in order.
+ * Any output from the reduce is problem report.
+ * <p>
+ * To run the program: 
+ * <b>bin/hadoop jar hadoop-*-examples.jar teravalidate out-dir report-dir</b>
+ * <p>
+ * If there is any output, something is wrong and the output of the reduce
+ * will have the problem report.
+ */
+public class TeraValidate extends Configured implements Tool {
+  private static final Text ERROR = new Text("error");
+  private static final Text CHECKSUM = new Text("checksum");
+  
+  private static String textifyBytes(Text t) {
+    BytesWritable b = new BytesWritable();
+    b.set(t.getBytes(), 0, t.getLength());
+    return b.toString();
+  }
+
+  static class ValidateMapper extends Mapper<Text,Text,Text,Text> {
+    private Text lastKey;
+    private String filename;
+    private Unsigned16 checksum = new Unsigned16();
+    private Unsigned16 tmp = new Unsigned16();
+    private Checksum crc32 = new PureJavaCrc32();
+
+    /**
+     * Get the final part of the input name
+     * @param split the input split
+     * @return the "part-r-00000" for the input
+     */
+    private String getFilename(FileSplit split) {
+      return split.getPath().getName();
+    }
+
+    public void map(Text key, Text value, Context context) 
+        throws IOException, InterruptedException {
+      if (lastKey == null) {
+        FileSplit fs = (FileSplit) context.getInputSplit();
+        filename = getFilename(fs);
+        context.write(new Text(filename + ":begin"), key);
+        lastKey = new Text();
+      } else {
+        if (key.compareTo(lastKey) < 0) {
+          context.write(ERROR, new Text("misorder in " + filename + 
+                                         " between " + textifyBytes(lastKey) + 
+                                         " and " + textifyBytes(key)));
+        }
+      }
+      // compute the crc of the key and value and add it to the sum
+      crc32.reset();
+      crc32.update(key.getBytes(), 0, key.getLength());
+      crc32.update(value.getBytes(), 0, value.getLength());
+      tmp.set(crc32.getValue());
+      checksum.add(tmp);
+      lastKey.set(key);
+    }
+    
+    public void cleanup(Context context) 
+        throws IOException, InterruptedException  {
+      if (lastKey != null) {
+        context.write(new Text(filename + ":end"), lastKey);
+        context.write(CHECKSUM, new Text(checksum.toString()));
+      }
+    }
+  }
+
+  /**
+   * Check the boundaries between the output files by making sure that the
+   * boundary keys are always increasing.
+   * Also passes any error reports along intact.
+   */
+  static class ValidateReducer extends Reducer<Text,Text,Text,Text> {
+    private boolean firstKey = true;
+    private Text lastKey = new Text();
+    private Text lastValue = new Text();
+    public void reduce(Text key, Iterable<Text> values,
+        Context context) throws IOException, InterruptedException  {
+      if (ERROR.equals(key)) {
+        for (Text val : values) {
+          context.write(key, val);
+        }
+      } else if (CHECKSUM.equals(key)) {
+        Unsigned16 tmp = new Unsigned16();
+        Unsigned16 sum = new Unsigned16();
+        for (Text val : values) {
+          tmp.set(val.toString());
+          sum.add(tmp);
+        }
+        context.write(CHECKSUM, new Text(sum.toString()));
+      } else {
+        Text value = values.iterator().next();
+        if (firstKey) {
+          firstKey = false;
+        } else {
+          if (value.compareTo(lastValue) < 0) {
+            context.write(ERROR, 
+                           new Text("bad key partitioning:\n  file " + 
+                                    lastKey + " key " + 
+                                    textifyBytes(lastValue) +
+                                    "\n  file " + key + " key " + 
+                                    textifyBytes(value)));
+          }
+        }
+        lastKey.set(key);
+        lastValue.set(value);
+      }
+    }
+    
+  }
+
+  private static void usage() throws IOException {
+    System.err.println("teravalidate <out-dir> <report-dir>");
+  }
+
+  public int run(String[] args) throws Exception {
+    Job job = Job.getInstance(getConf());
+    if (args.length != 2) {
+      usage();
+      return 1;
+    }
+    TeraInputFormat.setInputPaths(job, new Path(args[0]));
+    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    job.setJobName("TeraValidate");
+    job.setJarByClass(TeraValidate.class);
+    job.setMapperClass(ValidateMapper.class);
+    job.setReducerClass(ValidateReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    // force a single reducer
+    job.setNumReduceTasks(1);
+    // force a single split 
+    FileInputFormat.setMinInputSplitSize(job, Long.MAX_VALUE);
+    job.setInputFormatClass(TeraInputFormat.class);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new TeraValidate(), args);
+    System.exit(res);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/TeraValidate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Unsigned16.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Unsigned16.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Unsigned16.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Unsigned16.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * An unsigned 16 byte integer class that supports addition, multiplication,
+ * and left shifts.
+ */
+class Unsigned16 implements Writable {
+  private long hi8;
+  private long lo8;
+
+  public Unsigned16() {
+    hi8 = 0;
+    lo8 = 0;
+  }
+
+  public Unsigned16(long l) {
+    hi8 = 0;
+    lo8 = l;
+  }
+
+  public Unsigned16(Unsigned16 other) {
+    hi8 = other.hi8;
+    lo8 = other.lo8;
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof Unsigned16) {
+      Unsigned16 other = (Unsigned16) o;
+      return other.hi8 == hi8 && other.lo8 == lo8;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int) lo8;
+  }
+
+  /**
+   * Parse a hex string
+   * @param s the hex string
+   */
+  public Unsigned16(String s) throws NumberFormatException {
+    set(s);
+  }
+
+  /**
+   * Set the number from a hex string
+   * @param s the number in hexadecimal
+   * @throws NumberFormatException if the number is invalid
+   */
+  public void set(String s) throws NumberFormatException {
+    hi8 = 0;
+    lo8 = 0;
+    final long lastDigit = 0xfl << 60;
+    for (int i = 0; i < s.length(); ++i) {
+      int digit = getHexDigit(s.charAt(i));
+      if ((lastDigit & hi8) != 0) {
+        throw new NumberFormatException(s + " overflowed 16 bytes");
+      }
+      hi8 <<= 4;
+      hi8 |= (lo8 & lastDigit) >>> 60;
+      lo8 <<= 4;
+      lo8 |= digit;
+    }    
+  }
+
+  /**
+   * Set the number to a given long.
+   * @param l the new value, which is treated as an unsigned number
+   */
+  public void set(long l) {
+    lo8 = l;
+    hi8 = 0;
+  }
+
+  /**
+   * Map a hexadecimal character into a digit.
+   * @param ch the character
+   * @return the digit from 0 to 15
+   * @throws NumberFormatException
+   */
+  private static int getHexDigit(char ch) throws NumberFormatException {
+    if (ch >= '0' && ch <= '9') {
+      return ch - '0';
+    }
+    if (ch >= 'a' && ch <= 'f') {
+      return ch - 'a' + 10;
+    }
+    if (ch >= 'A' && ch <= 'F') {
+      return ch - 'A' + 10;
+    }
+    throw new NumberFormatException(ch + " is not a valid hex digit");
+  }
+
+  private static final Unsigned16 TEN = new Unsigned16(10);
+
+  public static Unsigned16 fromDecimal(String s) throws NumberFormatException {
+    Unsigned16 result = new Unsigned16();
+    Unsigned16 tmp = new Unsigned16();
+    for(int i=0; i < s.length(); i++) {
+      char ch = s.charAt(i);
+      if (ch < '0' || ch > '9') {
+        throw new NumberFormatException(ch + " not a valid decimal digit");
+      }
+      int digit = ch - '0';
+      result.multiply(TEN);
+      tmp.set(digit);
+      result.add(tmp);
+    }
+    return result;
+  }
+
+  /**
+   * Return the number as a hex string.
+   */
+  public String toString() {
+    if (hi8 == 0) {
+      return Long.toHexString(lo8);
+    } else {
+      StringBuilder result = new StringBuilder();
+      result.append(Long.toHexString(hi8));
+      String loString = Long.toHexString(lo8);
+      for(int i=loString.length(); i < 16; ++i) {
+        result.append('0');
+      }
+      result.append(loString);
+      return result.toString();
+    }
+  }
+
+  /**
+   * Get a given byte from the number.
+   * @param b the byte to get with 0 meaning the most significant byte
+   * @return the byte or 0 if b is outside of 0..15
+   */
+  public byte getByte(int b) {
+    if (b >= 0 && b < 16) {
+      if (b < 8) {
+        return (byte) (hi8 >> (56 - 8*b));
+      } else {
+        return (byte) (lo8 >> (120 - 8*b));
+      }
+    }
+    return 0;
+  }
+
+  /**
+   * Get the hexadecimal digit at the given position.
+   * @param p the digit position to get with 0 meaning the most significant
+   * @return the character or '0' if p is outside of 0..31
+   */
+  public char getHexDigit(int p) {
+    byte digit = getByte(p / 2);
+    if (p % 2 == 0) {
+      digit >>>= 4;
+    }
+    digit &= 0xf;
+    if (digit < 10) {
+      return (char) ('0' + digit);
+    } else {
+      return (char) ('A' + digit - 10);
+    }
+  }
+
+  /**
+   * Get the high 8 bytes as a long.
+   */
+  public long getHigh8() {
+    return hi8;
+  }
+  
+  /**
+   * Get the low 8 bytes as a long.
+   */
+  public long getLow8() {
+    return lo8;
+  }
+
+  /**
+   * Multiple the current number by a 16 byte unsigned integer. Overflow is not
+   * detected and the result is the low 16 bytes of the result. The numbers 
+   * are divided into 32 and 31 bit chunks so that the product of two chucks
+   * fits in the unsigned 63 bits of a long.
+   * @param b the other number
+   */
+  void multiply(Unsigned16 b) {
+    // divide the left into 4 32 bit chunks
+    long[] left = new long[4];
+    left[0] = lo8 & 0xffffffffl;
+    left[1] = lo8 >>> 32;
+    left[2] = hi8 & 0xffffffffl;
+    left[3] = hi8 >>> 32;
+    // divide the right into 5 31 bit chunks
+    long[] right = new long[5];
+    right[0] = b.lo8 & 0x7fffffffl;
+    right[1] = (b.lo8 >>> 31) & 0x7fffffffl;
+    right[2] = (b.lo8 >>> 62) + ((b.hi8 & 0x1fffffffl) << 2);
+    right[3] = (b.hi8 >>> 29) & 0x7fffffffl;
+    right[4] = (b.hi8 >>> 60);
+    // clear the cur value
+    set(0);
+    Unsigned16 tmp = new Unsigned16();
+    for(int l=0; l < 4; ++l) {
+      for (int r=0; r < 5; ++r) {
+        long prod = left[l] * right[r];
+        if (prod != 0) {
+          int off = l*32 + r*31;
+          tmp.set(prod);
+          tmp.shiftLeft(off);
+          add(tmp);
+        }
+      }
+    }
+  }
+
+  /**
+   * Add the given number into the current number.
+   * @param b the other number
+   */
+  public void add(Unsigned16 b) {
+    long sumHi;
+    long sumLo;
+    long  reshibit, hibit0, hibit1;
+
+    sumHi = hi8 + b.hi8;
+
+    hibit0 = (lo8 & 0x8000000000000000L);
+    hibit1 = (b.lo8 & 0x8000000000000000L);
+    sumLo = lo8 + b.lo8;
+    reshibit = (sumLo & 0x8000000000000000L);
+    if ((hibit0 & hibit1) != 0 | ((hibit0 ^ hibit1) != 0 && reshibit == 0))
+      sumHi++;  /* add carry bit */
+    hi8 = sumHi;
+    lo8 = sumLo;
+  }
+
+  /**
+   * Shift the number a given number of bit positions. The number is the low
+   * order bits of the result.
+   * @param bits the bit positions to shift by
+   */
+  public void shiftLeft(int bits) {
+    if (bits != 0) {
+      if (bits < 64) {
+        hi8 <<= bits;
+        hi8 |= (lo8 >>> (64 - bits));
+        lo8 <<= bits;
+      } else if (bits < 128) {
+        hi8 = lo8 << (bits - 64);
+        lo8 = 0;
+      } else {
+        hi8 = 0;
+        lo8 = 0;
+      }
+    }
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    hi8 = in.readLong();
+    lo8 = in.readLong();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(hi8);
+    out.writeLong(lo8);
+  }
+  
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/Unsigned16.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/job_history_summary.py
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/job_history_summary.py?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/job_history_summary.py (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/job_history_summary.py Thu Apr 18 23:54:18 2013
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+import sys
+
+pat = re.compile('(?P<name>[^=]+)="(?P<value>[^"]*)" *')
+counterPat = re.compile('(?P<name>[^:]+):(?P<value>[^,]*),?')
+
+def parse(tail):
+  result = {}
+  for n,v in re.findall(pat, tail):
+    result[n] = v
+  return result
+
+mapStartTime = {}
+mapEndTime = {}
+reduceStartTime = {}
+reduceShuffleTime = {}
+reduceSortTime = {}
+reduceEndTime = {}
+reduceBytes = {}
+
+for line in sys.stdin:
+  words = line.split(" ",1)
+  event = words[0]
+  attrs = parse(words[1])
+  if event == 'MapAttempt':
+    if attrs.has_key("START_TIME"):
+      mapStartTime[attrs["TASKID"]] = int(attrs["START_TIME"])/1000
+    elif attrs.has_key("FINISH_TIME"):
+      mapEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
+  elif event == 'ReduceAttempt':
+    if attrs.has_key("START_TIME"):
+      reduceStartTime[attrs["TASKID"]] = int(attrs["START_TIME"]) / 1000
+    elif attrs.has_key("FINISH_TIME"):
+      reduceShuffleTime[attrs["TASKID"]] = int(attrs["SHUFFLE_FINISHED"])/1000
+      reduceSortTime[attrs["TASKID"]] = int(attrs["SORT_FINISHED"])/1000
+      reduceEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
+  elif event == 'Task':
+    if attrs["TASK_TYPE"] == "REDUCE" and attrs.has_key("COUNTERS"):
+      for n,v in re.findall(counterPat, attrs["COUNTERS"]):
+        if n == "File Systems.HDFS bytes written":
+          reduceBytes[attrs["TASKID"]] = int(v)
+
+runningMaps = {}
+shufflingReduces = {}
+sortingReduces = {}
+runningReduces = {}
+startTime = min(reduce(min, mapStartTime.values()),
+                reduce(min, reduceStartTime.values()))
+endTime = max(reduce(max, mapEndTime.values()),
+              reduce(max, reduceEndTime.values()))
+
+reduces = reduceBytes.keys()
+reduces.sort()
+
+print "Name reduce-output-bytes shuffle-finish reduce-finish"
+for r in reduces:
+  print r, reduceBytes[r], reduceShuffleTime[r] - startTime,
+  print reduceEndTime[r] - startTime
+
+print
+
+for t in range(startTime, endTime):
+  runningMaps[t] = 0
+  shufflingReduces[t] = 0
+  sortingReduces[t] = 0
+  runningReduces[t] = 0
+
+for map in mapStartTime.keys():
+  for t in range(mapStartTime[map], mapEndTime[map]):
+    runningMaps[t] += 1
+for reduce in reduceStartTime.keys():
+  for t in range(reduceStartTime[reduce], reduceShuffleTime[reduce]):
+    shufflingReduces[t] += 1
+  for t in range(reduceShuffleTime[reduce], reduceSortTime[reduce]):
+    sortingReduces[t] += 1
+  for t in range(reduceSortTime[reduce], reduceEndTime[reduce]):
+    runningReduces[t] += 1
+
+print "time maps shuffle merge reduce"
+for t in range(startTime, endTime):
+  print t - startTime, runningMaps[t], shufflingReduces[t], sortingReduces[t], 
+  print runningReduces[t]

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/job_history_summary.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/package.html
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/package.html?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/package.html (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/package.html Thu Apr 18 23:54:18 2013
@@ -0,0 +1,114 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+This package consists of 3 map/reduce applications for Hadoop to
+compete in the annual <a
+href="http://www.hpl.hp.com/hosted/sortbenchmark" target="_top">terabyte sort</a>
+competition.
+
+<ul>
+<li><b>TeraGen</b> is a map/reduce program to generate the data.
+<li><b>TeraSort</b> samples the input data and uses map/reduce to
+    sort the data into a total order.
+<li><b>TeraValidate</b> is a map/reduce program that validates the
+    output is sorted.
+</ul>
+
+<p>
+
+<b>TeraGen</b> generates output data that is byte for byte
+equivalent to the C version including the newlines and specific
+keys. It divides the desired number of rows by the desired number of
+tasks and assigns ranges of rows to each map. The map jumps the random
+number generator to the correct value for the first row and generates
+the following rows.
+
+<p>
+
+<b>TeraSort</b> is a standard map/reduce sort, except for a custom
+partitioner that uses a sorted list of <i>N-1</i> sampled keys that define
+the key range for each reduce. In particular, all keys such that
+<i>sample[i-1] &lt;= key &lt; sample[i]</i> are sent to reduce
+<i>i</i>. This guarantees that the output of reduce <i>i</i> are all
+less than the output of reduce <i>i+1</i>. To speed up the
+partitioning, the partitioner builds a two level trie that quickly
+indexes into the list of sample keys based on the first two bytes of
+the key. TeraSort generates the sample keys by sampling the input
+before the job is submitted and writing the list of keys into HDFS.
+The input and output format, which are used by all 3 applications,
+read and write the text files in the right format. The output of the
+reduce has replication set to 1, instead of the default 3, because the
+contest does not require the output data be replicated on to multiple
+nodes.  
+
+<p>
+
+<b>TeraValidate</b> ensures that the output is globally sorted. It
+creates one map per a file in the output directory and each map ensures that
+each key is less than or equal to the previous one. The map also generates
+records with the first and last keys of the file and the reduce
+ensures that the first key of file <i>i</i> is greater that the last key of
+file <i>i-1</i>. Any problems are reported as output of the reduce with the
+keys that are out of order.
+
+<p>
+
+In May 2008, Owen O'Malley ran this code on a 910 node cluster and
+sorted the 10 billion records (1 TB) in 209 seconds (3.48 minutes) to
+win the annual general purpose (daytona)
+<a href="http://www.hpl.hp.com/hosted/sortbenchmark/">terabyte sort
+benchmark</a>.
+
+<p>
+
+The cluster statistics were:
+<ul>
+<li> 910 nodes
+<li> 4 dual core Xeons @ 2.0ghz per a node
+<li> 4 SATA disks per a node
+<li> 8G RAM per a node
+<li> 1 gigabit ethernet on each node
+<li> 40 nodes per a rack
+<li> 8 gigabit ethernet uplinks from each rack to the core
+<li> Red Hat Enterprise Linux Server Release 5.1 (kernel 2.6.18)
+<li> Sun Java JDK 1.6.0_05-b13
+</ul>
+
+<p>
+
+The test was on Hadoop trunk (pre-0.18) patched with <a
+href="http://issues.apache.org/jira/browse/HADOOP-3443">HADOOP-3443</a>
+and <a
+href="http://issues.apache.org/jira/browse/HADOOP-3446">HADOOP-3446</a>,
+which were required to remove intermediate writes to disk.
+TeraGen used
+1800 tasks to generate a total of 10 billion rows in HDFS, with a
+block size of 1024 MB.
+TeraSort was configured with 1800 maps and 1800 reduces, and
+<i>mapreduce.task.io.sort.mb</i>,
+<i>mapreduce.task.io.sort.factor</i>,
+<i>fs.inmemory.size.mb</i>, and task heap size
+sufficient that transient data was never spilled to disk, other at the
+end of the map. The sampler looked at 100,000 keys to determine the
+reduce boundaries, which lead to imperfect balancing with reduce
+outputs ranging from 337 MB to 872 MB.
+
+</body>
+</html>

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/terasort/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/test/java/org/apache/tez/mapreduce/examples/terasort/TestTeraSort.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/test/java/org/apache/tez/mapreduce/examples/terasort/TestTeraSort.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/test/java/org/apache/tez/mapreduce/examples/terasort/TestTeraSort.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/test/java/org/apache/tez/mapreduce/examples/terasort/TestTeraSort.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples.terasort;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.mapreduce.examples.terasort.TeraGen;
+import org.apache.tez.mapreduce.examples.terasort.TeraSort;
+import org.apache.tez.mapreduce.examples.terasort.TeraValidate;
+import org.junit.Ignore;
+@Ignore
+public class TestTeraSort extends HadoopTestCase {
+  
+  public TestTeraSort()
+      throws IOException {
+    super(CLUSTER_MR, DFS_FS, 1, 1);
+  }
+
+  protected void tearDown() throws Exception {
+    getFileSystem().delete(new Path(TEST_DIR), true);
+    super.tearDown();
+  }
+  
+  // Input/Output paths for sort
+  private static final String TEST_DIR = 
+    new File(System.getProperty("test.build.data", "/tmp"), "terasort")
+    .getAbsolutePath();
+  private static final Path SORT_INPUT_PATH = new Path(TEST_DIR, "sortin");
+  private static final Path SORT_OUTPUT_PATH = new Path(TEST_DIR, "sortout");
+  private static final Path TERA_OUTPUT_PATH = new Path(TEST_DIR, "validate");
+  private static final String NUM_ROWS = "100"; 
+
+  private void runTeraGen(Configuration conf, Path sortInput) 
+      throws Exception {
+    String[] genArgs = {NUM_ROWS, sortInput.toString()};
+    
+    // Run TeraGen
+    assertEquals(ToolRunner.run(conf, new TeraGen(), genArgs), 0);
+  }
+  
+  private void runTeraSort(Configuration conf,
+      Path sortInput, Path sortOutput) throws Exception {
+
+    // Setup command-line arguments to 'sort'
+    String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
+    
+    // Run Sort
+    assertEquals(ToolRunner.run(conf, new TeraSort(), sortArgs), 0);
+  }
+  
+  private void runTeraValidator(Configuration job, 
+                                       Path sortOutput, Path valOutput) 
+  throws Exception {
+    String[] svArgs = {sortOutput.toString(), valOutput.toString()};
+
+    // Run Tera-Validator
+    assertEquals(ToolRunner.run(job, new TeraValidate(), svArgs), 0);
+  }
+  
+  public void testTeraSort() throws Exception {
+    // Run TeraGen to generate input for 'terasort'
+    runTeraGen(createJobConf(), SORT_INPUT_PATH);
+
+    // Run terasort
+    runTeraSort(createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
+
+    // Run tera-validator to check if sort worked correctly
+    runTeraValidator(createJobConf(), SORT_OUTPUT_PATH,
+      TERA_OUTPUT_PATH);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/test/java/org/apache/tez/mapreduce/examples/terasort/TestTeraSort.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml Thu Apr 18 23:54:18 2013
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-mapreduce</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-engine</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+    </dependency>
+    <!-- Not really needed here, only needed in the AM. Pulling all MapReduce dependencies in the tez-mapreduce module -->
+    <!-- Needed to figure out shuffle meta information-->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+    </dependency>
+    <!-- Not really needed here, only needed in the AM. Pulling all MapReduce dependencies in the tez-mapreduce module -->
+    <!-- Needed for tez2 api, JobId etc etc. Should be possible to get rid of part of this in the new AM. Later -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.inject.extensions</groupId>
+      <artifactId>guice-assistedinject</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
+@InterfaceAudience.Private
+public class LocalClientProtocolProvider extends ClientProtocolProvider {
+
+  @Override
+  public ClientProtocol create(Configuration conf) throws IOException {
+    String framework =
+        conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
+    if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
+      return null;
+    }
+    String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local");
+    if ("local".equals(tracker)) {
+      conf.setInt("mapreduce.job.maps", 1);
+      return new LocalJobRunner(conf);
+    } else {
+
+      throw new IOException("Invalid \"" + JTConfig.JT_IPC_ADDRESS
+          + "\" configuration value for LocalJobRunner : \""
+          + tracker + "\"");
+    }
+  }
+
+  @Override
+  public ClientProtocol create(InetSocketAddress addr, Configuration conf) {
+    return null; // LocalJobRunner doesn't use a socket
+  }
+
+  @Override
+  public void close(ClientProtocol clientProtocol) {
+    // no clean up required
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message