hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r816831 [2/2] - in /hadoop/mapreduce/trunk: ./ src/examples/org/apache/hadoop/examples/terasort/ src/examples/org/apache/hadoop/examples/terasort/2009-write-up/ src/java/org/apache/hadoop/mapred/ src/test/
Date Sat, 19 Sep 2009 00:26:09 GMT
Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java?rev=816831&r1=816830&r2=816831&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
Sat Sep 19 00:26:07 2009
@@ -18,23 +18,30 @@
 
 package org.apache.hadoop.examples.terasort;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InvalidJobConfException;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.util.Progressable;
 
 /**
- * A streamlined text output format that writes key, value, and "\r\n".
+ * An output format that writes the key and value appended together.
  */
-public class TeraOutputFormat extends TextOutputFormat<Text,Text> {
-  static final String FINAL_SYNC_ATTRIBUTE = "mapreduce.terasort.final.sync";
+public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
+  static final String FINAL_SYNC_ATTRIBUTE = "terasort.final.sync";
 
   /**
    * Set the requirement for a final sync before the stream is closed.
@@ -50,28 +57,38 @@
     return conf.getBoolean(FINAL_SYNC_ATTRIBUTE, false);
   }
 
-  static class TeraRecordWriter extends LineRecordWriter<Text,Text> {
-    private static final byte[] newLine = "\r\n".getBytes();
+  static class TeraRecordWriter implements RecordWriter<Text,Text> {
     private boolean finalSync = false;
+    private FSDataOutputStream out;
 
-    public TeraRecordWriter(DataOutputStream out,
+    public TeraRecordWriter(FSDataOutputStream out,
                             JobConf conf) {
-      super(out);
       finalSync = getFinalSync(conf);
+      this.out = out;
     }
 
     public synchronized void write(Text key, 
                                    Text value) throws IOException {
       out.write(key.getBytes(), 0, key.getLength());
       out.write(value.getBytes(), 0, value.getLength());
-      out.write(newLine, 0, newLine.length);
     }
     
-    public void close() throws IOException {
+    public void close(Reporter reporter) throws IOException {
       if (finalSync) {
-        ((FSDataOutputStream) out).sync();
+        out.sync();
       }
-      super.close(null);
+      out.close();
+    }
+  }
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, 
+                               JobConf job
+                              ) throws InvalidJobConfException, IOException {
+    // Ensure that the output directory is set and not already there
+    Path outDir = getOutputPath(job);
+    if (outDir == null) {
+      throw new InvalidJobConfException("Output directory not set in JobConf.");
     }
   }
 
@@ -85,4 +102,25 @@
     FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
     return new TeraRecordWriter(fileOut, job);
   }
+  
+  public static class TeraOutputCommitter extends FileOutputCommitter {
+
+    @Override
+    public void cleanupJob(JobContext jobContext) {
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+      return taskContext.getTaskAttemptID().getTaskID().getTaskType() ==
+               TaskType.REDUCE;
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) {
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) {
+    }
+  }
 }

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java?rev=816831&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java
(added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java
Sat Sep 19 00:26:07 2009
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples.terasort;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.terasort.TeraInputFormat.TeraFileSplit;
+import org.apache.hadoop.mapred.FileSplit;
+
+class TeraScheduler {
+  private static final Log LOG = LogFactory.getLog(TeraScheduler.class);
+  private InputSplit[] splits;
+  private List<Host> hosts = new ArrayList<Host>();
+  private int slotsPerHost;
+  private int remainingSplits = 0;
+  private FileSplit[] realSplits = null;
+
+  static class InputSplit {
+    String filename;
+    boolean isAssigned = false;
+    List<Host> locations = new ArrayList<Host>();
+    InputSplit(String filename) {
+      this.filename = filename;
+    }
+    public String toString() {
+      StringBuffer result = new StringBuffer();
+      result.append(filename);
+      result.append(" on ");
+      for(Host host: locations) {
+        result.append(host.hostname);
+        result.append(", ");
+      }
+      return result.toString();
+    }
+  }
+  static class Host {
+    String hostname;
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    Host(String hostname) {
+      this.hostname = hostname;
+    }
+    public String toString() {
+      StringBuffer result = new StringBuffer();
+      result.append(splits.size());
+      result.append(" ");
+      result.append(hostname);
+      return result.toString();
+    }
+  }
+
+  List<String> readFile(String filename) throws IOException {
+    List<String> result = new ArrayList<String>(10000);
+    BufferedReader in = new BufferedReader(new FileReader(filename));
+    String line = in.readLine();
+    while (line != null) {
+      result.add(line);
+      line = in.readLine();
+    }
+    in.close();
+    return result;
+  }
+
+  public TeraScheduler(String splitFilename, 
+                       String nodeFilename) throws IOException {
+    slotsPerHost = 4;
+    // get the hosts
+    Map<String, Host> hostIds = new HashMap<String,Host>();
+    for(String hostName: readFile(nodeFilename)) {
+      Host host = new Host(hostName);
+      hosts.add(host);
+      hostIds.put(hostName, host);
+    }
+    // read the blocks
+    List<String> splitLines = readFile(splitFilename);
+    splits = new InputSplit[splitLines.size()];
+    remainingSplits = 0;
+    for(String line: splitLines) {
+      StringTokenizer itr = new StringTokenizer(line);
+      InputSplit newSplit = new InputSplit(itr.nextToken());
+      splits[remainingSplits++] = newSplit;
+      while (itr.hasMoreTokens()) {
+        Host host = hostIds.get(itr.nextToken());
+        newSplit.locations.add(host);
+        host.splits.add(newSplit);
+      }
+    }
+  }
+
+  public TeraScheduler(FileSplit[] realSplits,
+                       Configuration conf) throws IOException {
+    this.realSplits = realSplits;
+    this.slotsPerHost = conf.getInt("mapred.tasktracker.map.tasks.maximum", 4);
+    Map<String, Host> hostTable = new HashMap<String, Host>();
+    splits = new InputSplit[realSplits.length];
+    for(FileSplit realSplit: realSplits) {
+      InputSplit split = new InputSplit(realSplit.getPath().toString());
+      splits[remainingSplits++] = split;
+      for(String hostname: realSplit.getLocations()) {
+        Host host = hostTable.get(hostname);
+        if (host == null) {
+          host = new Host(hostname);
+          hostTable.put(hostname, host);
+          hosts.add(host);
+        }
+        host.splits.add(split);
+        split.locations.add(host);
+      }
+    }
+  }
+
+  Host pickBestHost() {
+    Host result = null;
+    int splits = Integer.MAX_VALUE;
+    for(Host host: hosts) {
+      if (host.splits.size() < splits) {
+        result = host;
+        splits = host.splits.size();
+      }
+    }
+    if (result != null) {
+      hosts.remove(result);
+      LOG.debug("picking " + result);
+    }
+    return result;
+  }
+
+  void pickBestSplits(Host host) {
+    int tasksToPick = Math.min(slotsPerHost, 
+                               (int) Math.ceil((double) remainingSplits / 
+                                               hosts.size()));
+    InputSplit[] best = new InputSplit[tasksToPick];
+    for(InputSplit cur: host.splits) {
+      LOG.debug("  examine: " + cur.filename + " " + cur.locations.size());
+      int i = 0;
+      while (i < tasksToPick && best[i] != null && 
+             best[i].locations.size() <= cur.locations.size()) {
+        i += 1;
+      }
+      if (i < tasksToPick) {
+        for(int j = tasksToPick - 1; j > i; --j) {
+          best[j] = best[j-1];
+        }
+        best[i] = cur;
+      }
+    }
+    // for the chosen blocks, remove them from the other locations
+    for(int i=0; i < tasksToPick; ++i) {
+      if (best[i] != null) {
+        LOG.debug(" best: " + best[i].filename);
+        for (Host other: best[i].locations) {
+          other.splits.remove(best[i]);
+        }
+        best[i].locations.clear();
+        best[i].locations.add(host);
+        best[i].isAssigned = true;
+        remainingSplits -= 1;
+      }
+    }
+    // for the non-chosen blocks, remove this host
+    for(InputSplit cur: host.splits) {
+      if (!cur.isAssigned) {
+        cur.locations.remove(host);
+      }
+    }
+  }
+  
+  void solve() throws IOException {
+    Host host = pickBestHost();
+    while (host != null) {
+      pickBestSplits(host);
+      host = pickBestHost();
+    }
+  }
+
+  /**
+   * Solve the schedule and modify the FileSplit array to reflect the new
+   * schedule. It will move placed splits to front and unplacable splits
+   * to the end.
+   * @return a new list of FileSplits that are modified to have the
+   *    best host as the only host.
+   * @throws IOException
+   */
+  public FileSplit[] getNewFileSplits() throws IOException {
+    solve();
+    FileSplit[] result = new FileSplit[realSplits.length];
+    int left = 0;
+    int right = realSplits.length - 1;
+    for(int i=0; i < splits.length; ++i) {
+      if (splits[i].isAssigned) {
+        // copy the split and fix up the locations
+        ((TeraFileSplit) realSplits[i]).setLocations
+           (new String[]{splits[i].locations.get(0).hostname});
+        result[left++] = realSplits[i];
+      } else {
+        result[right--] = realSplits[i];
+      }
+    }
+    return result;
+  }
+
+  public static void main(String[] args) throws IOException {
+    TeraScheduler problem = new TeraScheduler("block-loc.txt", "nodes");
+    for(Host host: problem.hosts) {
+      System.out.println(host);
+    }
+    LOG.info("starting solve");
+    problem.solve();
+    List<InputSplit> leftOvers = new ArrayList<InputSplit>();
+    for(int i=0; i < problem.splits.length; ++i) {
+      if (problem.splits[i].isAssigned) {
+        System.out.println("sched: " + problem.splits[i]);        
+      } else {
+        leftOvers.add(problem.splits[i]);
+      }
+    }
+    for(InputSplit cur: leftOvers) {
+      System.out.println("left: " + cur);
+    }
+    System.out.println("left over: " + leftOvers.size());
+    LOG.info("done");
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java?rev=816831&r1=816830&r2=816831&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
(original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
Sat Sep 19 00:26:07 2009
@@ -18,20 +18,19 @@
 
 package org.apache.hadoop.examples.terasort;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.examples.terasort.TeraOutputFormat.TeraOutputCommitter;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -88,13 +87,13 @@
         if (key.getLength() <= level) {
           return child[0].findPartition(key);
         }
-        return child[key.getBytes()[level]].findPartition(key);
+        return child[key.getBytes()[level] & 0xff].findPartition(key);
       }
       void setChild(int idx, TrieNode child) {
         this.child[idx] = child;
       }
       void print(PrintStream strm) throws IOException {
-        for(int ch=0; ch < 255; ++ch) {
+        for(int ch=0; ch < 256; ++ch) {
           for(int i = 0; i < 2*getLevel(); ++i) {
             strm.print(' ');
           }
@@ -123,7 +122,7 @@
       }
       int findPartition(Text key) {
         for(int i=lower; i<upper; ++i) {
-          if (splitPoints[i].compareTo(key) >= 0) {
+          if (splitPoints[i].compareTo(key) > 0) {
             return i;
           }
         }
@@ -150,16 +149,15 @@
      */
     private static Text[] readPartitions(FileSystem fs, Path p, 
                                          JobConf job) throws IOException {
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
-      List<Text> parts = new ArrayList<Text>();
-      Text key = new Text();
-      NullWritable value = NullWritable.get();
-      while (reader.next(key, value)) {
-        parts.add(key);
-        key = new Text();
+      int reduces = job.getNumReduceTasks();
+      Text[] result = new Text[reduces - 1];
+      DataInputStream reader = fs.open(p);
+      for(int i=0; i < reduces - 1; ++i) {
+        result[i] = new Text();
+        result[i].readFields(reader);
       }
       reader.close();
-      return parts.toArray(new Text[parts.size()]);  
+      return result;
     }
 
     /**
@@ -197,7 +195,7 @@
                                      maxDepth);
       }
       // pick up the rest
-      trial.getBytes()[depth] = 127;
+      trial.getBytes()[depth] = (byte) 255;
       result.child[255] = buildTrie(splits, currentBound, upper, trial,
                                     maxDepth);
       return result;
@@ -223,27 +221,77 @@
     
   }
   
+  /**
+   * A total order partitioner that assigns keys based on their first 
+   * PREFIX_LENGTH bytes, assuming a flat distribution.
+   */
+  public static class SimplePartitioner implements Partitioner<Text, Text>{
+    int prefixesPerReduce;
+    private static final int PREFIX_LENGTH = 3;
+    public void configure(JobConf job) {
+      prefixesPerReduce = (int) Math.ceil((1 << (8 * PREFIX_LENGTH)) / 
+                                          (float) job.getNumReduceTasks());
+    }
+    @Override
+    public int getPartition(Text key, Text value, int numPartitions) {
+      byte[] bytes = key.getBytes();
+      int len = Math.min(PREFIX_LENGTH, key.getLength());
+      int prefix = 0;
+      for(int i=0; i < len; ++i) {
+        prefix = (prefix << 8) | (0xff & bytes[i]);
+      }
+      return prefix / prefixesPerReduce;
+    }
+  }
+
+  public static boolean getUseSimplePartitioner(Configuration conf) {
+    return conf.getBoolean("terasort.partitioner.simple", false);
+  }
+
+  public static void setUseSimplePartitioner(Configuration conf,
+                                             boolean value) {
+    conf.setBoolean("terasort.partitioner.simple", value);
+  }
+
   public int run(String[] args) throws Exception {
     LOG.info("starting");
     JobConf job = (JobConf) getConf();
     Path inputDir = new Path(args[0]);
-    inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
-    Path partitionFile = new Path(inputDir, TeraInputFormat.PARTITION_FILENAME);
-    URI partitionUri = new URI(partitionFile.toString() +
-                               "#" + TeraInputFormat.PARTITION_FILENAME);
-    TeraInputFormat.setInputPaths(job, new Path(args[0]));
-    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    Path outputDir = new Path(args[1]);
+    boolean useSimplePartitioner = getUseSimplePartitioner(job);
+    FileSystem outputFileSystem = outputDir.getFileSystem(job);
+    outputDir = outputDir.makeQualified(outputFileSystem);
+    if (outputFileSystem.exists(outputDir)) {
+      throw new IOException("Output directory " + outputDir + 
+                            " already exists.");
+    }
+    TeraInputFormat.setInputPaths(job, inputDir);
+    FileOutputFormat.setOutputPath(job, outputDir);
     job.setJobName("TeraSort");
     job.setJarByClass(TeraSort.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
     job.setInputFormat(TeraInputFormat.class);
     job.setOutputFormat(TeraOutputFormat.class);
-    job.setPartitionerClass(TotalOrderPartitioner.class);
-    TeraInputFormat.writePartitionFile(job, partitionFile);
-    DistributedCache.addCacheFile(partitionUri, job);
-    DistributedCache.createSymlink(job);
-    job.setInt("dfs.replication", 1);
+    if (useSimplePartitioner) {
+      job.setPartitionerClass(SimplePartitioner.class);
+    } else {
+      long start = System.currentTimeMillis();
+      Path partitionFile = new Path(outputDir, 
+                                    TeraInputFormat.PARTITION_FILENAME);
+      URI partitionUri = new URI(partitionFile.toString() +
+                                 "#" + TeraInputFormat.PARTITION_FILENAME);
+      TeraInputFormat.writePartitionFile(job, partitionFile);
+      DistributedCache.addCacheFile(partitionUri, job);
+      DistributedCache.createSymlink(job);    
+      long end = System.currentTimeMillis();
+      System.out.println("Spent " + (end - start) + "ms computing partitions.");
+      job.setPartitionerClass(TotalOrderPartitioner.class);
+    }
+    job.setOutputCommitter(TeraOutputCommitter.class);
+    
+    job.setInt("dfs.replication", 
+               job.getInt("terasort.output.replication", 1));
     TeraOutputFormat.setFinalSync(job, true);
     JobClient.runJob(job);
     LOG.info("done");

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java?rev=816831&r1=816830&r2=816831&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java
(original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java
Sat Sep 19 00:26:07 2009
@@ -20,9 +20,11 @@
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.zip.Checksum;
 
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -31,9 +33,10 @@
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -51,14 +54,24 @@
  * will have the problem report.
  */
 public class TeraValidate extends Configured implements Tool {
-  private static final Text error = new Text("error");
+  private static final Text ERROR = new Text("error");
+  private static final Text CHECKSUM = new Text("checksum");
+  
+  private static String textifyBytes(Text t) {
+    BytesWritable b = new BytesWritable();
+    b.set(t.getBytes(), 0, t.getLength());
+    return b.toString();
+  }
 
   static class ValidateMapper extends MapReduceBase 
       implements Mapper<Text,Text,Text,Text> {
     private Text lastKey;
     private OutputCollector<Text,Text> output;
     private String filename;
-    
+    private Unsigned16 checksum = new Unsigned16();
+    private Unsigned16 tmp = new Unsigned16();
+    private Checksum crc32 = new PureJavaCrc32();
+
     /**
      * Get the final part of the input name
      * @param split the input split
@@ -68,26 +81,38 @@
       return split.getPath().getName();
     }
 
+    private int getPartition(FileSplit split) {
+      return Integer.parseInt(split.getPath().getName().substring(5));
+    }
+
     public void map(Text key, Text value, OutputCollector<Text,Text> output,
                     Reporter reporter) throws IOException {
       if (lastKey == null) {
-        filename = getFilename((FileSplit) reporter.getInputSplit());
+        FileSplit fs = (FileSplit) reporter.getInputSplit();
+        filename = getFilename(fs);
         output.collect(new Text(filename + ":begin"), key);
         lastKey = new Text();
         this.output = output;
       } else {
         if (key.compareTo(lastKey) < 0) {
-          output.collect(error, new Text("misorder in " + filename + 
-                                         " last: '" + lastKey + 
-                                         "' current: '" + key + "'"));
+          output.collect(ERROR, new Text("misorder in " + filename + 
+                                         " between " + textifyBytes(lastKey) + 
+                                         " and " + textifyBytes(key)));
         }
       }
+      // compute the crc of the key and value and add it to the sum
+      crc32.reset();
+      crc32.update(key.getBytes(), 0, key.getLength());
+      crc32.update(value.getBytes(), 0, value.getLength());
+      tmp.set(crc32.getValue());
+      checksum.add(tmp);
       lastKey.set(key);
     }
     
     public void close() throws IOException {
       if (lastKey != null) {
         output.collect(new Text(filename + ":end"), lastKey);
+        output.collect(CHECKSUM, new Text(checksum.toString()));
       }
     }
   }
@@ -105,20 +130,31 @@
     public void reduce(Text key, Iterator<Text> values,
                        OutputCollector<Text, Text> output, 
                        Reporter reporter) throws IOException {
-      if (error.equals(key)) {
+      if (ERROR.equals(key)) {
         while(values.hasNext()) {
           output.collect(key, values.next());
         }
+      } else if (CHECKSUM.equals(key)) {
+        Unsigned16 tmp = new Unsigned16();
+        Unsigned16 sum = new Unsigned16();
+        while (values.hasNext()) {
+          String val = values.next().toString();
+          tmp.set(val);
+          sum.add(tmp);
+        }
+        output.collect(CHECKSUM, new Text(sum.toString()));
       } else {
         Text value = values.next();
         if (firstKey) {
           firstKey = false;
         } else {
           if (value.compareTo(lastValue) < 0) {
-            output.collect(error, 
-                           new Text("misordered keys last: " + 
-                                    lastKey + " '" + lastValue +
-                                    "' current: " + key + " '" + value + "'"));
+            output.collect(ERROR, 
+                           new Text("bad key partitioning:\n  file " + 
+                                    lastKey + " key " + 
+                                    textifyBytes(lastValue) +
+                                    "\n  file " + key + " key " + 
+                                    textifyBytes(value)));
           }
         }
         lastKey.set(key);
@@ -128,8 +164,16 @@
     
   }
 
+  private static void usage() throws IOException {
+    System.err.println("teravalidate <out-dir> <report-dir>");
+  }
+
   public int run(String[] args) throws Exception {
     JobConf job = (JobConf) getConf();
+    if (args.length != 2) {
+      usage();
+      return 1;
+    }
     TeraInputFormat.setInputPaths(job, new Path(args[0]));
     FileOutputFormat.setOutputPath(job, new Path(args[1]));
     job.setJobName("TeraValidate");
@@ -141,7 +185,8 @@
     // force a single reducer
     job.setNumReduceTasks(1);
     // force a single split 
-    job.setLong(FileInputFormat.SPLIT_MINSIZE, Long.MAX_VALUE);
+    job.setLong(org.apache.hadoop.mapreduce.lib.input.
+                FileInputFormat.SPLIT_MINSIZE, Long.MAX_VALUE);
     job.setInputFormat(TeraInputFormat.class);
     JobClient.runJob(job);
     return 0;

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/Unsigned16.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/Unsigned16.java?rev=816831&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/Unsigned16.java
(added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/Unsigned16.java
Sat Sep 19 00:26:07 2009
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples.terasort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * An unsigned 16 byte integer class that supports addition, multiplication,
+ * and left shifts.
+ */
+class Unsigned16 implements Writable {
+  private long hi8;
+  private long lo8;
+
+  public Unsigned16() {
+    hi8 = 0;
+    lo8 = 0;
+  }
+
+  public Unsigned16(long l) {
+    hi8 = 0;
+    lo8 = l;
+  }
+
+  public Unsigned16(Unsigned16 other) {
+    hi8 = other.hi8;
+    lo8 = other.lo8;
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof Unsigned16) {
+      Unsigned16 other = (Unsigned16) o;
+      return other.hi8 == hi8 && other.lo8 == lo8;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int) lo8;
+  }
+
+  /**
+   * Parse a hex string
+   * @param s the hex string
+   */
+  public Unsigned16(String s) throws NumberFormatException {
+    set(s);
+  }
+
+  /**
+   * Set the number from a hex string
+   * @param s the number in hexadecimal
+   * @throws NumberFormatException if the number is invalid
+   */
+  public void set(String s) throws NumberFormatException {
+    hi8 = 0;
+    lo8 = 0;
+    final long lastDigit = 0xfl << 60;
+    for (int i = 0; i < s.length(); ++i) {
+      int digit = getHexDigit(s.charAt(i));
+      if ((lastDigit & hi8) != 0) {
+        throw new NumberFormatException(s + " overflowed 16 bytes");
+      }
+      hi8 <<= 4;
+      hi8 |= (lo8 & lastDigit) >>> 60;
+      lo8 <<= 4;
+      lo8 |= digit;
+    }    
+  }
+
+  /**
+   * Set the number to a given long.
+   * @param l the new value, which is treated as an unsigned number
+   */
+  public void set(long l) {
+    lo8 = l;
+    hi8 = 0;
+  }
+
+  /**
+   * Map a hexadecimal character into a digit.
+   * @param ch the character
+   * @return the digit from 0 to 15
+   * @throws NumberFormatException
+   */
+  private static int getHexDigit(char ch) throws NumberFormatException {
+    if (ch >= '0' && ch <= '9') {
+      return ch - '0';
+    }
+    if (ch >= 'a' && ch <= 'f') {
+      return ch - 'a' + 10;
+    }
+    if (ch >= 'A' && ch <= 'F') {
+      return ch - 'A' + 10;
+    }
+    throw new NumberFormatException(ch + " is not a valid hex digit");
+  }
+
+  private static final Unsigned16 TEN = new Unsigned16(10);
+
+  public static Unsigned16 fromDecimal(String s) throws NumberFormatException {
+    Unsigned16 result = new Unsigned16();
+    Unsigned16 tmp = new Unsigned16();
+    for(int i=0; i < s.length(); i++) {
+      char ch = s.charAt(i);
+      if (ch < '0' || ch > '9') {
+        throw new NumberFormatException(ch + " not a valid decimal digit");
+      }
+      int digit = ch - '0';
+      result.multiply(TEN);
+      tmp.set(digit);
+      result.add(tmp);
+    }
+    return result;
+  }
+
+  /**
+   * Return the number as a hex string.
+   */
+  public String toString() {
+    if (hi8 == 0) {
+      return Long.toHexString(lo8);
+    } else {
+      StringBuilder result = new StringBuilder();
+      result.append(Long.toHexString(hi8));
+      String loString = Long.toHexString(lo8);
+      for(int i=loString.length(); i < 16; ++i) {
+        result.append('0');
+      }
+      result.append(loString);
+      return result.toString();
+    }
+  }
+
+  /**
+   * Get a given byte from the number.
+   * @param b the byte to get with 0 meaning the most significant byte
+   * @return the byte or 0 if b is outside of 0..15
+   */
+  public byte getByte(int b) {
+    if (b >= 0 && b < 16) {
+      if (b < 8) {
+        return (byte) (hi8 >> (56 - 8*b));
+      } else {
+        return (byte) (lo8 >> (120 - 8*b));
+      }
+    }
+    return 0;
+  }
+
+  /**
+   * Get the hexadecimal digit at the given position.
+   * @param p the digit position to get with 0 meaning the most significant
+   * @return the character or '0' if p is outside of 0..31
+   */
+  public char getHexDigit(int p) {
+    byte digit = getByte(p / 2);
+    if (p % 2 == 0) {
+      digit >>>= 4;
+    }
+    digit &= 0xf;
+    if (digit < 10) {
+      return (char) ('0' + digit);
+    } else {
+      return (char) ('A' + digit - 10);
+    }
+  }
+
+  /**
+   * Get the high 8 bytes as a long.
+   */
+  public long getHigh8() {
+    return hi8;
+  }
+  
+  /**
+   * Get the low 8 bytes as a long.
+   */
+  public long getLow8() {
+    return lo8;
+  }
+
+  /**
+   * Multiple the current number by a 16 byte unsigned integer. Overflow is not
+   * detected and the result is the low 16 bytes of the result. The numbers 
+   * are divided into 32 and 31 bit chunks so that the product of two chucks
+   * fits in the unsigned 63 bits of a long.
+   * @param b the other number
+   */
+  void multiply(Unsigned16 b) {
+    // divide the left into 4 32 bit chunks
+    long[] left = new long[4];
+    left[0] = lo8 & 0xffffffffl;
+    left[1] = lo8 >>> 32;
+    left[2] = hi8 & 0xffffffffl;
+    left[3] = hi8 >>> 32;
+    // divide the right into 5 31 bit chunks
+    long[] right = new long[5];
+    right[0] = b.lo8 & 0x7fffffffl;
+    right[1] = (b.lo8 >>> 31) & 0x7fffffffl;
+    right[2] = (b.lo8 >>> 62) + ((b.hi8 & 0x1fffffffl) << 2);
+    right[3] = (b.hi8 >>> 29) & 0x7fffffffl;
+    right[4] = (b.hi8 >>> 60);
+    // clear the cur value
+    set(0);
+    Unsigned16 tmp = new Unsigned16();
+    for(int l=0; l < 4; ++l) {
+      for (int r=0; r < 5; ++r) {
+        long prod = left[l] * right[r];
+        if (prod != 0) {
+          int off = l*32 + r*31;
+          tmp.set(prod);
+          tmp.shiftLeft(off);
+          add(tmp);
+        }
+      }
+    }
+  }
+
+  /**
+   * Add the given number into the current number.
+   * @param b the other number
+   */
+  public void add(Unsigned16 b) {
+    long sumHi;
+    long sumLo;
+    long  reshibit, hibit0, hibit1;
+
+    sumHi = hi8 + b.hi8;
+
+    hibit0 = (lo8 & 0x8000000000000000L);
+    hibit1 = (b.lo8 & 0x8000000000000000L);
+    sumLo = lo8 + b.lo8;
+    reshibit = (sumLo & 0x8000000000000000L);
+    if ((hibit0 & hibit1) != 0 | ((hibit0 ^ hibit1) != 0 && reshibit == 0))
+      sumHi++;  /* add carry bit */
+    hi8 = sumHi;
+    lo8 = sumLo;
+  }
+
+  /**
+   * Shift the number a given number of bit positions. The number is the low
+   * order bits of the result.
+   * @param bits the bit positions to shift by
+   */
+  public void shiftLeft(int bits) {
+    if (bits != 0) {
+      if (bits < 64) {
+        hi8 <<= bits;
+        hi8 |= (lo8 >>> (64 - bits));
+        lo8 <<= bits;
+      } else if (bits < 128) {
+        hi8 = lo8 << (bits - 64);
+        lo8 = 0;
+      } else {
+        hi8 = 0;
+        lo8 = 0;
+      }
+    }
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    hi8 = in.readLong();
+    lo8 = in.readLong();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(hi8);
+    out.writeLong(lo8);
+  }
+  
+  
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=816831&r1=816830&r2=816831&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java Sat Sep
19 00:26:07 2009
@@ -195,6 +195,15 @@
     return result.toArray(new FileStatus[result.size()]);
   }
 
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected FileSplit makeSplit(Path file, long start, long length, 
+                                String[] hosts) {
+    return new FileSplit(file, start, length, hosts);
+  }
+
   /** Splits files returned by {@link #listStatus(JobConf)} when
    * they're too big.*/ 
   @SuppressWarnings("deprecation")
@@ -230,21 +239,21 @@
         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
           String[] splitHosts = getSplitHosts(blkLocations, 
               length-bytesRemaining, splitSize, clusterMap);
-          splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
-              splitHosts));
+          splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
+                               splitHosts));
           bytesRemaining -= splitSize;
         }
         
         if (bytesRemaining != 0) {
-          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
+          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 
                      blkLocations[blkLocations.length-1].getHosts()));
         }
       } else if (length != 0) {
         String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
-        splits.add(new FileSplit(path, 0, length, splitHosts));
+        splits.add(makeSplit(path, 0, length, splitHosts));
       } else { 
         //Create empty hosts array for zero length files
-        splits.add(new FileSplit(path, 0, length, new String[0]));
+        splits.add(makeSplit(path, 0, length, new String[0]));
       }
     }
     LOG.debug("Total # of splits: " + splits.size());

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java?rev=816831&r1=816830&r2=816831&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java Sat Sep 19 00:26:07
2009
@@ -34,7 +34,7 @@
 public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit 
                        implements InputSplit {
   org.apache.hadoop.mapreduce.lib.input.FileSplit fs; 
-  FileSplit() {
+  protected FileSplit() {
     fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit();
   }
 

Modified: hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml?rev=816831&r1=816830&r2=816831&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml Sat Sep 19 00:26:07 2009
@@ -265,5 +265,20 @@
     <Match>
        <Class name="org.apache.hadoop.mapred.TaskScheduler$QueueRefresher" />
        <Bug pattern="SIC_INNER_SHOULD_BE_STATIC" />
+    </Match>
+
+    <Match>
+      <Class name="org.apache.hadoop.examples.terasort.TeraInputFormat$1" />
+      <Method name="run" />
+      <Bug pattern="DM_EXIT" />
+    </Match>
+    <Match>
+      <Class name="org.apache.hadoop.examples.terasort.TeraOutputFormat$TeraOutputCommitter"/>
+      <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+    </Match>
+    <Match>
+      <Class name="org.apache.hadoop.examples.terasort.Unsigned16" />
+      <Method name="getHexDigit"/>
+      <Bug pattern="ICAST_QUESTIONABLE_UNSIGNED_RIGHT_SHIFT" />
      </Match>
  </FindBugsFilter>



Mime
View raw message