hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r885145 [13/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Date Sat, 28 Nov 2009 20:26:22 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/RandomTextWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/RandomTextWriter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/RandomTextWriter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/RandomTextWriter.java Sat Nov 28 20:26:01 2009
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.Formatter;
 import java.util.List;
 import java.util.Random;
 
@@ -48,23 +47,23 @@
  * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  * <configuration>
  *   <property>
- *     <name>test.randomtextwrite.min_words_key</name>
+ *     <name>mapreduce.randomtextwriter.minwordskey</name>
  *     <value>5</value>
  *   </property>
  *   <property>
- *     <name>test.randomtextwrite.max_words_key</name>
+ *     <name>mapreduce.randomtextwriter.maxwordskey</name>
  *     <value>10</value>
  *   </property>
  *   <property>
- *     <name>test.randomtextwrite.min_words_value</name>
+ *     <name>mapreduce.randomtextwriter.minwordsvalue</name>
  *     <value>20</value>
  *   </property>
  *   <property>
- *     <name>test.randomtextwrite.max_words_value</name>
+ *     <name>mapreduce.randomtextwriter.maxwordsvalue</name>
  *     <value>100</value>
  *   </property>
  *   <property>
- *     <name>test.randomtextwrite.total_bytes</name>
+ *     <name>mapreduce.randomtextwriter.totalbytes</name>
  *     <value>1099511627776</value>
  *   </property>
  * </configuration></xmp>
@@ -76,6 +75,16 @@
  *            [-outFormat <i>output format class</i>] <i>output</i> 
  */
 public class RandomTextWriter extends Configured implements Tool {
+  public static final String TOTAL_BYTES = 
+    "mapreduce.randomtextwriter.totalbytes";
+  public static final String BYTES_PER_MAP = 
+    "mapreduce.randomtextwriter.bytespermap";
+  public static final String MAPS_PER_HOST = 
+    "mapreduce.randomtextwriter.mapsperhost";
+  public static final String MAX_VALUE = "mapreduce.randomtextwriter.maxwordsvalue";
+  public static final String MIN_VALUE = "mapreduce.randomtextwriter.minwordsvalue";
+  public static final String MIN_KEY = "mapreduce.randomtextwriter.minwordskey";
+  public static final String MAX_KEY = "mapreduce.randomtextwriter.maxwordskey";
   
   static int printUsage() {
     System.out.println("randomtextwriter " +
@@ -97,35 +106,19 @@
     private int wordsInKeyRange;
     private int minWordsInValue;
     private int wordsInValueRange;
-
-    private final Random random = new Random();
-    private final Text keyWords = new Text();
-    private final Text valueWords = new Text();
-    private final String STATUS_MSG = "wrote record %d. %d bytes left.";
-    private final Formatter statusFormat = new Formatter(new StringBuilder());
-
-    private Counter byteCounter;
-    private Counter recordCounter;
-
+    private Random random = new Random();
+    
     /**
      * Save the configuration value that we need to write the data.
      */
     public void setup(Context context) {
       Configuration conf = context.getConfiguration();
-      numBytesToWrite = conf.getLong("test.randomtextwrite.bytes_per_map",
+      numBytesToWrite = conf.getLong(BYTES_PER_MAP,
                                     1*1024*1024*1024);
-      minWordsInKey = 
-        conf.getInt("test.randomtextwrite.min_words_key", 5);
-      wordsInKeyRange = 
-        (conf.getInt("test.randomtextwrite.max_words_key", 10) - 
-         minWordsInKey);
-      minWordsInValue = 
-        conf.getInt("test.randomtextwrite.min_words_value", 10);
-      wordsInValueRange = 
-        (conf.getInt("test.randomtextwrite.max_words_value", 100) - 
-         minWordsInValue);
-      byteCounter = context.getCounter(Counters.BYTES_WRITTEN);
-      recordCounter = context.getCounter(Counters.RECORDS_WRITTEN);
+      minWordsInKey = conf.getInt(MIN_KEY, 5);
+      wordsInKeyRange = (conf.getInt(MAX_KEY, 10) - minWordsInKey);
+      minWordsInValue = conf.getInt(MIN_VALUE, 10);
+      wordsInValueRange = (conf.getInt(MAX_VALUE, 100) - minWordsInValue);
     }
     
     /**
@@ -136,39 +129,38 @@
       int itemCount = 0;
       while (numBytesToWrite > 0) {
         // Generate the key/value 
-        final int noWordsKey = minWordsInKey +
+        int noWordsKey = minWordsInKey + 
           (wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
-        final int noWordsValue = minWordsInValue +
+        int noWordsValue = minWordsInValue + 
           (wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
-
-        int recordBytes = generateSentence(keyWords, noWordsKey);
-        recordBytes += generateSentence(valueWords, noWordsValue);
-        numBytesToWrite -= recordBytes;
-
+        Text keyWords = generateSentence(noWordsKey);
+        Text valueWords = generateSentence(noWordsValue);
+        
         // Write the sentence 
         context.write(keyWords, valueWords);
-
+        
+        numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
+        
         // Update counters, progress etc.
-        recordCounter.increment(1);
-        byteCounter.increment(recordBytes);
-
-        if (++itemCount % 1000 == 0) {
-          ((StringBuilder)statusFormat.out()).setLength(0);
-          context.setStatus(statusFormat.format(STATUS_MSG,
-                itemCount, numBytesToWrite).toString());
+        context.getCounter(Counters.BYTES_WRITTEN).increment(
+                  keyWords.getLength() + valueWords.getLength());
+        context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
+        if (++itemCount % 200 == 0) {
+          context.setStatus("wrote record " + itemCount + ". " + 
+                             numBytesToWrite + " bytes left.");
         }
       }
       context.setStatus("done with " + itemCount + " records.");
     }
     
-    private int generateSentence(Text txt, int noWords) {
-      txt.clear();
+    private Text generateSentence(int noWords) {
+      StringBuffer sentence = new StringBuffer();
+      String space = " ";
       for (int i=0; i < noWords; ++i) {
-        final Text word = words[random.nextInt(words.length)];
-        txt.append(word.getBytes(), 0, word.getLength());
-        txt.append(SPACE, 0, SPACE.length);
+        sentence.append(words[random.nextInt(words.length)]);
+        sentence.append(space);
       }
-      return txt.getLength();
+      return new Text(sentence.toString());
     }
   }
   
@@ -187,21 +179,21 @@
     Configuration conf = getConf();
     JobClient client = new JobClient(conf);
     ClusterStatus cluster = client.getClusterStatus();
-    int numMapsPerHost = conf.getInt("test.randomtextwrite.maps_per_host", 10);
-    long numBytesToWritePerMap = conf.getLong("test.randomtextwrite.bytes_per_map",
+    int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
+    long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
                                              1*1024*1024*1024);
     if (numBytesToWritePerMap == 0) {
-      System.err.println("Cannot have test.randomtextwrite.bytes_per_map set to 0");
+      System.err.println("Cannot have " + BYTES_PER_MAP +" set to 0");
       return -2;
     }
-    long totalBytesToWrite = conf.getLong("test.randomtextwrite.total_bytes", 
+    long totalBytesToWrite = conf.getLong(TOTAL_BYTES, 
          numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
     int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
     if (numMaps == 0 && totalBytesToWrite > 0) {
       numMaps = 1;
-      conf.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
+      conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
     }
-    conf.setInt("mapred.map.tasks", numMaps);
+    conf.setInt(JobContext.NUM_MAPS, numMaps);
     
     Job job = new Job(conf);
     
@@ -257,12 +249,10 @@
     System.exit(res);
   }
 
-  private static final byte[] SPACE = " ".getBytes();
-
   /**
    * A random list of 100 words from /usr/share/dict/words
    */
-  private final static Text[] words = buildText(new String[] {
+  private static String[] words = {
                                    "diurnalness", "Homoiousian",
                                    "spiranthic", "tetragynian",
                                    "silverhead", "ungreat",
@@ -763,14 +753,5 @@
                                    "sterilely", "unrealize",
                                    "unpatched", "hypochondriacism",
                                    "critically", "cheesecutter",
-                                  });
-
-  private static Text[] buildText(String[] str) {
-    Text[] ret = new Text[str.length];
-    for (int i = 0; i < str.length; ++i) {
-      ret[i] = new Text(str[i]);
-    }
-    return ret;
-  }
-
+                                  };
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/RandomWriter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/RandomWriter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/RandomWriter.java Sat Nov 28 20:26:01 2009
@@ -52,23 +52,23 @@
  * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  * <configuration>
  *   <property>
- *     <name>test.randomwrite.min_key</name>
+ *     <name>mapreduce.randomwriter.minkey</name>
  *     <value>10</value>
  *   </property>
  *   <property>
- *     <name>test.randomwrite.max_key</name>
+ *     <name>mapreduce.randomwriter.maxkey</name>
  *     <value>10</value>
  *   </property>
  *   <property>
- *     <name>test.randomwrite.min_value</name>
+ *     <name>mapreduce.randomwriter.minvalue</name>
  *     <value>90</value>
  *   </property>
  *   <property>
- *     <name>test.randomwrite.max_value</name>
+ *     <name>mapreduce.randomwriter.maxvalue</name>
  *     <value>90</value>
  *   </property>
  *   <property>
- *     <name>test.randomwrite.total_bytes</name>
+ *     <name>mapreduce.randomwriter.totalbytes</name>
  *     <value>1099511627776</value>
  *   </property>
  * </configuration></xmp>
@@ -77,6 +77,15 @@
  * and ones supported by {@link GenericOptionsParser} via the command-line.
  */
 public class RandomWriter extends Configured implements Tool {
+  public static final String TOTAL_BYTES = "mapreduce.randomwriter.totalbytes";
+  public static final String BYTES_PER_MAP = 
+    "mapreduce.randomwriter.bytespermap";
+  public static final String MAPS_PER_HOST = 
+    "mapreduce.randomwriter.mapsperhost";
+  public static final String MAX_VALUE = "mapreduce.randomwriter.maxvalue";
+  public static final String MIN_VALUE = "mapreduce.randomwriter.minvalue";
+  public static final String MIN_KEY = "mapreduce.randomwriter.minkey";
+  public static final String MAX_KEY = "mapreduce.randomwriter.maxkey";
   
   /**
    * User counters
@@ -97,7 +106,7 @@
       List<InputSplit> result = new ArrayList<InputSplit>();
       Path outDir = FileOutputFormat.getOutputPath(job);
       int numSplits = 
-            job.getConfiguration().getInt("mapred.map.tasks", 1);
+            job.getConfiguration().getInt(JobContext.NUM_MAPS, 1);
       for(int i=0; i < numSplits; ++i) {
         result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
                                   (String[])null));
@@ -207,14 +216,14 @@
     @Override
     public void setup(Context context) {
       Configuration conf = context.getConfiguration();
-      numBytesToWrite = conf.getLong("test.randomwrite.bytes_per_map",
+      numBytesToWrite = conf.getLong(BYTES_PER_MAP,
                                     1*1024*1024*1024);
-      minKeySize = conf.getInt("test.randomwrite.min_key", 10);
+      minKeySize = conf.getInt(MIN_KEY, 10);
       keySizeRange = 
-        conf.getInt("test.randomwrite.max_key", 1000) - minKeySize;
-      minValueSize = conf.getInt("test.randomwrite.min_value", 0);
+        conf.getInt(MAX_KEY, 1000) - minKeySize;
+      minValueSize = conf.getInt(MIN_VALUE, 0);
       valueSizeRange = 
-        conf.getInt("test.randomwrite.max_value", 20000) - minValueSize;
+        conf.getInt(MAX_VALUE, 20000) - minValueSize;
     }
   }
   
@@ -236,21 +245,21 @@
     Configuration conf = getConf();
     JobClient client = new JobClient(conf);
     ClusterStatus cluster = client.getClusterStatus();
-    int numMapsPerHost = conf.getInt("test.randomwriter.maps_per_host", 10);
-    long numBytesToWritePerMap = conf.getLong("test.randomwrite.bytes_per_map",
+    int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
+    long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
                                              1*1024*1024*1024);
     if (numBytesToWritePerMap == 0) {
-      System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
+      System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0");
       return -2;
     }
-    long totalBytesToWrite = conf.getLong("test.randomwrite.total_bytes", 
+    long totalBytesToWrite = conf.getLong(TOTAL_BYTES, 
          numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
     int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
     if (numMaps == 0 && totalBytesToWrite > 0) {
       numMaps = 1;
-      conf.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
+      conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
     }
-    conf.setInt("mapred.map.tasks", numMaps);
+    conf.setInt(JobContext.NUM_MAPS, numMaps);
 
     Job job = new Job(conf);
     

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Sort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Sort.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Sort.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Sort.java Sat Nov 28 20:26:01 2009
@@ -55,6 +55,8 @@
  *            <i>in-dir</i> <i>out-dir</i> 
  */
 public class Sort<K,V> extends Configured implements Tool {
+  public static final String REDUCES_PER_HOST = 
+    "mapreduce.sort.reducesperhost";
   private Job job = null;
 
   static int printUsage() {
@@ -81,7 +83,7 @@
     JobClient client = new JobClient(conf);
     ClusterStatus cluster = client.getClusterStatus();
     int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
-    String sort_reduces = conf.get("test.sort.reduces_per_host");
+    String sort_reduces = conf.get(REDUCES_PER_HOST);
     if (sort_reduces != null) {
        num_reduces = cluster.getTaskTrackers() * 
                        Integer.parseInt(sort_reduces);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/WordCount.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/WordCount.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/WordCount.java Sat Nov 28 20:26:01 2009
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.examples;
 
 import java.io.IOException;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java Sat Nov 28 20:26:01 2009
@@ -109,11 +109,11 @@
     public void setup(Context context) {
       this.context = context;
       Configuration conf = context.getConfiguration();
-      depth = conf.getInt("pent.depth", PENT_DEPTH);
-      width = conf.getInt("pent.width", PENT_WIDTH);
-      height = conf.getInt("pent.height", PENT_HEIGHT);
+      depth = conf.getInt(Pentomino.DEPTH, PENT_DEPTH);
+      width = conf.getInt(Pentomino.WIDTH, PENT_WIDTH);
+      height = conf.getInt(Pentomino.HEIGHT, PENT_HEIGHT);
       pent = (Pentomino) 
-        ReflectionUtils.newInstance(conf.getClass("pent.class", 
+        ReflectionUtils.newInstance(conf.getClass(Pentomino.CLASS, 
                                                   OneSidedPentomino.class), 
                                     conf);
       pent.initialize(width, height);
@@ -172,12 +172,12 @@
     }
 
     Configuration conf = getConf();
-    int width = conf.getInt("pent.width", PENT_WIDTH);
-    int height = conf.getInt("pent.height", PENT_HEIGHT);
-    int depth = conf.getInt("pent.depth", PENT_DEPTH);
-    Class<? extends Pentomino> pentClass = conf.getClass("pent.class", 
+    int width = conf.getInt(Pentomino.WIDTH, PENT_WIDTH);
+    int height = conf.getInt(Pentomino.HEIGHT, PENT_HEIGHT);
+    int depth = conf.getInt(Pentomino.DEPTH, PENT_DEPTH);
+    Class<? extends Pentomino> pentClass = conf.getClass(Pentomino.CLASS, 
       OneSidedPentomino.class, Pentomino.class);
-    int numMaps = conf.getInt("mapred.map.tasks", DEFAULT_MAPS);
+    int numMaps = conf.getInt(JobContext.NUM_MAPS, DEFAULT_MAPS);
     Path output = new Path(args[0]);
     Path input = new Path(output + "_input");
     FileSystem fileSys = FileSystem.get(conf);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/dancing/Pentomino.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/dancing/Pentomino.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/dancing/Pentomino.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/dancing/Pentomino.java Sat Nov 28 20:26:01 2009
@@ -21,6 +21,10 @@
 import java.util.*;
 
 public class Pentomino {
+  public static final String DEPTH = "mapreduce.pentomino.depth";
+  public static final String WIDTH = "mapreduce.pentomino.width";
+  public static final String HEIGHT = "mapreduce.pentomino.height";
+  public static final String CLASS = "mapreduce.pentomino.class";
 
   /**
    * This interface just is a marker for what types I expect to get back
@@ -56,7 +60,7 @@
       }
       this.shape = new boolean[lines.size()][];
       for(int i=0 ; i < lines.size(); i++) {
-        this.shape[i] = (boolean[]) lines.get(i);
+        this.shape[i] = lines.get(i);
       }
     }
     
@@ -379,7 +383,7 @@
     }
     boolean[] row = new boolean[dancer.getNumberColumns()];
     for(int idx = 0; idx < pieces.size(); ++idx) {
-      Piece piece = (Piece) pieces.get(idx);
+      Piece piece = pieces.get(idx);
       row[idx + pieceBase] = true;
       generateRows(dancer, piece, width, height, false, row, idx == 0);
       if (piece.getFlippable()) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/dancing/Sudoku.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/dancing/Sudoku.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/dancing/Sudoku.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/dancing/Sudoku.java Sat Nov 28 20:26:01 2009
@@ -154,7 +154,7 @@
       line = file.readLine();
     }
     size = result.size();
-    board = (int[][]) result.toArray(new int [size][]);
+    board = result.toArray(new int [size][]);
     squareYSize = (int) Math.sqrt(size);
     squareXSize = size / squareYSize;
     file.close();
@@ -236,8 +236,8 @@
       rowValues[i] = false;
     }
     // find the square coordinates
-    int xBox = (int) x / squareXSize;
-    int yBox = (int) y / squareYSize;
+    int xBox = x / squareXSize;
+    int yBox = y / squareYSize;
     // mark the column
     rowValues[x*size + num - 1] = true;
     // mark the row

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/pi/DistSum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/pi/DistSum.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/pi/DistSum.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/pi/DistSum.java Sat Nov 28 20:26:01 2009
@@ -38,9 +38,9 @@
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -68,7 +68,7 @@
   private static final Log LOG = LogFactory.getLog(DistSum.class);
 
   private static final String NAME = DistSum.class.getSimpleName();
-  private static final String N_PARTS = NAME + ".nParts";
+  private static final String N_PARTS = "mapreduce.pi." + NAME + ".nParts";
   /////////////////////////////////////////////////////////////////////////////
   /** DistSum job parameters */
   static class Parameters {
@@ -379,14 +379,14 @@
   public static class MixMachine extends Machine {
     private static final MixMachine INSTANCE = new MixMachine();
     
-    private JobClient jobclient;
+    private Cluster cluster;
 
     /** {@inheritDoc} */
     @Override
     public synchronized void init(Job job) throws IOException {
       final Configuration conf = job.getConfiguration();
-      if (jobclient == null)
-        jobclient = new JobClient(JobTracker.getAddress(conf), conf);
+      if (cluster == null)
+        cluster = new Cluster(JobTracker.getAddress(conf), conf);
       chooseMachine(conf).init(job);
     }
 
@@ -398,9 +398,11 @@
       try {
         for(;; Thread.sleep(2000)) {
           //get cluster status
-          final ClusterStatus status = jobclient.getClusterStatus();
-          final int m = status.getMaxMapTasks() - status.getMapTasks();
-          final int r = status.getMaxReduceTasks() - status.getReduceTasks();
+          final ClusterMetrics status = cluster.getClusterStatus();
+          final int m = 
+            status.getMapSlotCapacity() - status.getOccupiedMapSlots();
+          final int r = 
+            status.getReduceSlotCapacity() - status.getOccupiedReduceSlots();
           if (m >= parts || r >= parts) {
             //favor ReduceSide machine
             final Machine value = r >= parts?
@@ -433,10 +435,10 @@
     SummationWritable.write(sigma, DistSum.class, jobconf);
 
     // disable task timeout
-    jobconf.setLong("mapred.task.timeout", 0);
+    jobconf.setLong(JobContext.TASK_TIMEOUT, 0);
     // do not use speculative execution
-    jobconf.setBoolean("mapred.map.tasks.speculative.execution", false);
-    jobconf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+    jobconf.setBoolean(JobContext.MAP_SPECULATIVE, false);
+    jobconf.setBoolean(JobContext.REDUCE_SPECULATIVE, false);
 
     return job; 
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/pi/Parser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/pi/Parser.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/pi/Parser.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/pi/Parser.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,4 @@
- /**
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/pi/math/Bellard.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/pi/math/Bellard.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/pi/math/Bellard.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/pi/math/Bellard.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,4 @@
- /**
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java Sat Nov 28 20:26:01 2009
@@ -21,7 +21,10 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.zip.Checksum;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -38,19 +41,20 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
- * Generate the official terasort input data set.
+ * Generate the official GraySort input data set.
  * The user specifies the number of rows and the output directory and this
  * class runs a map/reduce program to generate the data.
  * The format of the data is:
  * <ul>
- * <li>(10 bytes key) (10 bytes rowid) (78 bytes filler) \r \n
- * <li>The keys are random characters from the set ' ' .. '~'.
- * <li>The rowid is the right justified row id as a int.
- * <li>The filler consists of 7 runs of 10 characters from 'A' to 'Z'.
+ * <li>(10 bytes key) (constant 2 bytes) (32 bytes rowid) 
+ *     (constant 4 bytes) (48 bytes filler) (constant 4 bytes)
+ * <li>The rowid is the right justified row id as a hex number.
  * </ul>
  *
  * <p>
@@ -58,7 +62,11 @@
  * <b>bin/hadoop jar hadoop-*-examples.jar teragen 10000000000 in-dir</b>
  */
 public class TeraGen extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(TeraSort.class);
 
+  public static enum Counters {CHECKSUM}
+
+  public static String NUM_ROWS = "mapreduce.terasort.num-rows";
   /**
    * An input format that assigns ranges of longs to each mapper.
    */
@@ -159,96 +167,25 @@
     public InputSplit[] getSplits(JobConf job, 
                                   int numSplits) {
       long totalRows = getNumberOfRows(job);
-      long rowsPerSplit = totalRows / numSplits;
-      System.out.println("Generating " + totalRows + " using " + numSplits + 
-                         " maps with step of " + rowsPerSplit);
+      LOG.info("Generating " + totalRows + " using " + numSplits);
       InputSplit[] splits = new InputSplit[numSplits];
       long currentRow = 0;
-      for(int split=0; split < numSplits-1; ++split) {
-        splits[split] = new RangeInputSplit(currentRow, rowsPerSplit);
-        currentRow += rowsPerSplit;
+      for(int split=0; split < numSplits; ++split) {
+        long goal = (long) Math.ceil(totalRows * (double)(split+1) / numSplits);
+        splits[split] = new RangeInputSplit(currentRow, goal - currentRow);
+        currentRow = goal;
       }
-      splits[numSplits-1] = new RangeInputSplit(currentRow, 
-                                                totalRows - currentRow);
       return splits;
     }
 
   }
   
   static long getNumberOfRows(JobConf job) {
-    return job.getLong("terasort.num-rows", 0);
+    return job.getLong(NUM_ROWS, 0);
   }
   
   static void setNumberOfRows(JobConf job, long numRows) {
-    job.setLong("terasort.num-rows", numRows);
-  }
-
-  static class RandomGenerator {
-    private long seed = 0;
-    private static final long mask32 = (1l<<32) - 1;
-    /**
-     * The number of iterations separating the precomputed seeds.
-     */
-    private static final int seedSkip = 128 * 1024 * 1024;
-    /**
-     * The precomputed seed values after every seedSkip iterations.
-     * There should be enough values so that a 2**32 iterations are 
-     * covered.
-     */
-    private static final long[] seeds = new long[]{0L,
-                                                   4160749568L,
-                                                   4026531840L,
-                                                   3892314112L,
-                                                   3758096384L,
-                                                   3623878656L,
-                                                   3489660928L,
-                                                   3355443200L,
-                                                   3221225472L,
-                                                   3087007744L,
-                                                   2952790016L,
-                                                   2818572288L,
-                                                   2684354560L,
-                                                   2550136832L,
-                                                   2415919104L,
-                                                   2281701376L,
-                                                   2147483648L,
-                                                   2013265920L,
-                                                   1879048192L,
-                                                   1744830464L,
-                                                   1610612736L,
-                                                   1476395008L,
-                                                   1342177280L,
-                                                   1207959552L,
-                                                   1073741824L,
-                                                   939524096L,
-                                                   805306368L,
-                                                   671088640L,
-                                                   536870912L,
-                                                   402653184L,
-                                                   268435456L,
-                                                   134217728L,
-                                                  };
-
-    /**
-     * Start the random number generator on the given iteration.
-     * @param initalIteration the iteration number to start on
-     */
-    RandomGenerator(long initalIteration) {
-      int baseIndex = (int) ((initalIteration & mask32) / seedSkip);
-      seed = seeds[baseIndex];
-      for(int i=0; i < initalIteration % seedSkip; ++i) {
-        next();
-      }
-    }
-
-    RandomGenerator() {
-      this(0);
-    }
-
-    long next() {
-      seed = (seed * 3141592621l + 663896637) & mask32;
-      return seed;
-    }
+    job.setLong(NUM_ROWS, numRows);
   }
 
   /**
@@ -260,87 +197,93 @@
 
     private Text key = new Text();
     private Text value = new Text();
-    private RandomGenerator rand;
-    private byte[] keyBytes = new byte[12];
-    private byte[] spaces = "          ".getBytes();
-    private byte[][] filler = new byte[26][];
-    {
-      for(int i=0; i < 26; ++i) {
-        filler[i] = new byte[10];
-        for(int j=0; j<10; ++j) {
-          filler[i][j] = (byte) ('A' + i);
-        }
-      }
-    }
-    
-    /**
-     * Add a random key to the text
-     * @param rowId
-     */
-    private void addKey() {
-      for(int i=0; i<3; i++) {
-        long temp = rand.next() / 52;
-        keyBytes[3 + 4*i] = (byte) (' ' + (temp % 95));
-        temp /= 95;
-        keyBytes[2 + 4*i] = (byte) (' ' + (temp % 95));
-        temp /= 95;
-        keyBytes[1 + 4*i] = (byte) (' ' + (temp % 95));
-        temp /= 95;
-        keyBytes[4*i] = (byte) (' ' + (temp % 95));
-      }
-      key.set(keyBytes, 0, 10);
-    }
-    
-    /**
-     * Add the rowid to the row.
-     * @param rowId
-     */
-    private void addRowId(long rowId) {
-      byte[] rowid = Integer.toString((int) rowId).getBytes();
-      int padSpace = 10 - rowid.length;
-      if (padSpace > 0) {
-        value.append(spaces, 0, 10 - rowid.length);
-      }
-      value.append(rowid, 0, Math.min(rowid.length, 10));
-    }
-
-    /**
-     * Add the required filler bytes. Each row consists of 7 blocks of
-     * 10 characters and 1 block of 8 characters.
-     * @param rowId the current row number
-     */
-    private void addFiller(long rowId) {
-      int base = (int) ((rowId * 8) % 26);
-      for(int i=0; i<7; ++i) {
-        value.append(filler[(base+i) % 26], 0, 10);
-      }
-      value.append(filler[(base+7) % 26], 0, 8);
-    }
+    private Unsigned16 rand = null;
+    private Unsigned16 rowId = null;
+    private Unsigned16 checksum = new Unsigned16();
+    private Checksum crc32 = new PureJavaCrc32();
+    private Unsigned16 total = new Unsigned16();
+    private static final Unsigned16 ONE = new Unsigned16(1);
+    private byte[] buffer = new byte[TeraInputFormat.KEY_LENGTH +
+                                     TeraInputFormat.VALUE_LENGTH];
+    private Counter checksumCounter;
 
     public void map(LongWritable row, NullWritable ignored,
                     OutputCollector<Text, Text> output,
                     Reporter reporter) throws IOException {
-      long rowId = row.get();
       if (rand == null) {
-        // we use 3 random numbers per a row
-        rand = new RandomGenerator(rowId*3);
-      }
-      addKey();
-      value.clear();
-      addRowId(rowId);
-      addFiller(rowId);
+        rowId = new Unsigned16(row.get());
+        rand = Random16.skipAhead(rowId);
+        checksumCounter = reporter.getCounter(Counters.CHECKSUM);
+      }
+      Random16.nextRand(rand);
+      GenSort.generateRecord(buffer, rand, rowId);
+      key.set(buffer, 0, TeraInputFormat.KEY_LENGTH);
+      value.set(buffer, TeraInputFormat.KEY_LENGTH, 
+                TeraInputFormat.VALUE_LENGTH);
       output.collect(key, value);
+      crc32.reset();
+      crc32.update(buffer, 0, 
+                   TeraInputFormat.KEY_LENGTH + TeraInputFormat.VALUE_LENGTH);
+      checksum.set(crc32.getValue());
+      total.add(checksum);
+      rowId.add(ONE);
+    }
+
+    @Override
+    public void close() {
+      checksumCounter.increment(total.getLow8());
     }
+  }
 
+  private static void usage() throws IOException {
+    System.err.println("teragen <num rows> <output dir>");
   }
 
   /**
+   * Parse a number that optionally has a postfix that denotes a base.
+   * @param str an string integer with an option base {k,m,b,t}.
+   * @return the expanded value
+   */
+  private static long parseHumanLong(String str) {
+    char tail = str.charAt(str.length() - 1);
+    long base = 1;
+    switch (tail) {
+    case 't':
+      base *= 1000 * 1000 * 1000 * 1000;
+      break;
+    case 'b':
+      base *= 1000 * 1000 * 1000;
+      break;
+    case 'm':
+      base *= 1000 * 1000;
+      break;
+    case 'k':
+      base *= 1000;
+      break;
+    default:
+    }
+    if (base != 1) {
+      str = str.substring(0, str.length() - 1);
+    }
+    return Long.parseLong(str) * base;
+  }
+  
+  /**
    * @param args the cli arguments
    */
   public int run(String[] args) throws IOException {
     JobConf job = (JobConf) getConf();
-    setNumberOfRows(job, Long.parseLong(args[0]));
-    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    if (args.length != 2) {
+      usage();
+      return 1;
+    }
+    setNumberOfRows(job, parseHumanLong(args[0]));
+    Path outputDir = new Path(args[1]);
+    if (outputDir.getFileSystem(job).exists(outputDir)) {
+      throw new IOException("Output directory " + outputDir + 
+                            " already exists.");
+    }
+    FileOutputFormat.setOutputPath(job, outputDir);
     job.setJobName("TeraGen");
     job.setJarByClass(TeraGen.class);
     job.setMapperClass(SortGenMapper.class);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java Sat Nov 28 20:26:01 2009
@@ -18,15 +18,15 @@
 
 package org.apache.hadoop.examples.terasort;
 
+import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -37,6 +37,7 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.QuickSort;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * An input format that reads the first 10 characters of each line as the key
@@ -46,10 +47,43 @@
 public class TeraInputFormat extends FileInputFormat<Text,Text> {
 
   static final String PARTITION_FILENAME = "_partition.lst";
-  static final String SAMPLE_SIZE = "terasort.partitions.sample";
+  private static final String NUM_PARTITIONS = "terasort.num.partitions";
+  private static final String SAMPLE_SIZE = "terasort.partitions.sample";
+  static final int KEY_LENGTH = 10;
+  static final int VALUE_LENGTH = 90;
+  static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
   private static JobConf lastConf = null;
   private static InputSplit[] lastResult = null;
 
+  static class TeraFileSplit extends FileSplit {
+    private String[] locations;
+    public TeraFileSplit() {}
+    public TeraFileSplit(Path file, long start, long length, String[] hosts) {
+      super(file, start, length, hosts);
+      locations = hosts;
+    }
+    protected void setLocations(String[] hosts) {
+      locations = hosts;
+    }
+    @Override
+    public String[] getLocations() {
+      return locations;
+    }
+    public String toString() {
+      StringBuffer result = new StringBuffer();
+      result.append(getPath());
+      result.append(" from ");
+      result.append(getStart());
+      result.append(" length ");
+      result.append(getLength());
+      for(String host: getLocations()) {
+        result.append(" ");
+        result.append(host);
+      }
+      return result.toString();
+    }
+  }
+
   static class TextSampler implements IndexedSortable {
     private ArrayList<Text> records = new ArrayList<Text>();
 
@@ -67,7 +101,9 @@
     }
 
     public void addKey(Text key) {
-      records.add(new Text(key));
+      synchronized (this) {
+        records.add(new Text(key));
+      }
     }
 
     /**
@@ -80,7 +116,7 @@
     Text[] createPartitions(int numPartitions) {
       int numRecords = records.size();
       System.out.println("Making " + numPartitions + " from " + numRecords + 
-                         " records");
+                         " sampled records");
       if (numPartitions > numRecords) {
         throw new IllegalArgumentException
           ("Requested more partitions than input keys (" + numPartitions +
@@ -88,7 +124,6 @@
       }
       new QuickSort().sort(this, 0, records.size());
       float stepSize = numRecords / (float) numPartitions;
-      System.out.println("Step size is " + stepSize);
       Text[] result = new Text[numPartitions-1];
       for(int i=1; i < numPartitions; ++i) {
         result[i-1] = records.get(Math.round(stepSize * i));
@@ -105,54 +140,86 @@
    * @param partFile where to write the output file to
    * @throws IOException if something goes wrong
    */
-  public static void writePartitionFile(JobConf conf, 
+  public static void writePartitionFile(final JobConf conf, 
                                         Path partFile) throws IOException {
-    TeraInputFormat inFormat = new TeraInputFormat();
-    TextSampler sampler = new TextSampler();
-    Text key = new Text();
-    Text value = new Text();
+    long t1 = System.currentTimeMillis();
+    final TeraInputFormat inFormat = new TeraInputFormat();
+    final TextSampler sampler = new TextSampler();
     int partitions = conf.getNumReduceTasks();
     long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
-    InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
-    int samples = Math.min(10, splits.length);
-    long recordsPerSample = sampleSize / samples;
-    int sampleStep = splits.length / samples;
-    long records = 0;
+    final InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
+    long t2 = System.currentTimeMillis();
+    System.out.println("Computing input splits took " + (t2 - t1) + "ms");
+    int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.length);
+    System.out.println("Sampling " + samples + " splits of " + splits.length);
+    final long recordsPerSample = sampleSize / samples;
+    final int sampleStep = splits.length / samples;
+    Thread[] samplerReader = new Thread[samples];
     // take N samples from different parts of the input
     for(int i=0; i < samples; ++i) {
-      RecordReader<Text,Text> reader = 
-        inFormat.getRecordReader(splits[sampleStep * i], conf, null);
-      while (reader.next(key, value)) {
-        sampler.addKey(key);
-        records += 1;
-        if ((i+1) * recordsPerSample <= records) {
-          break;
+      final int idx = i;
+      samplerReader[i] = 
+        new Thread ("Sampler Reader " + idx) {
+        {
+          setDaemon(true);
         }
-      }
+        public void run() {
+          Text key = new Text();
+          Text value = new Text();
+          long records = 0;
+          try {
+            RecordReader<Text,Text> reader = 
+              inFormat.getRecordReader(splits[sampleStep * idx], conf, null);
+            while (reader.next(key, value)) {
+              sampler.addKey(key);
+              records += 1;
+              if (recordsPerSample <= records) {
+                break;
+              }
+            }
+          } catch (IOException ie){
+            System.err.println("Got an exception while reading splits " +
+                StringUtils.stringifyException(ie));
+            System.exit(-1);
+          }
+        }
+      };
+      samplerReader[i].start();
     }
     FileSystem outFs = partFile.getFileSystem(conf);
-    if (outFs.exists(partFile)) {
-      outFs.delete(partFile, false);
+    DataOutputStream writer = outFs.create(partFile, true, 64*1024, (short) 10, 
+                                           outFs.getDefaultBlockSize());
+    for (int i = 0; i < samples; i++) {
+      try {
+        samplerReader[i].join();
+      } catch (InterruptedException e) {
+      }
     }
-    SequenceFile.Writer writer = 
-      SequenceFile.createWriter(outFs, conf, partFile, Text.class, 
-                                NullWritable.class);
-    NullWritable nullValue = NullWritable.get();
     for(Text split : sampler.createPartitions(partitions)) {
-      writer.append(split, nullValue);
+      split.write(writer);
     }
     writer.close();
+    long t3 = System.currentTimeMillis();
+    System.out.println("Computing parititions took " + (t3 - t2) + "ms");
   }
 
   static class TeraRecordReader implements RecordReader<Text,Text> {
-    private LineRecordReader in;
-    private LongWritable junk = new LongWritable();
-    private Text line = new Text();
-    private static int KEY_LENGTH = 10;
+    private FSDataInputStream in;
+    private long offset;
+    private long length;
+    private static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
+    private byte[] buffer = new byte[RECORD_LENGTH];
 
     public TeraRecordReader(Configuration job, 
                             FileSplit split) throws IOException {
-      in = new LineRecordReader(job, split);
+      Path p = split.getPath();
+      FileSystem fs = p.getFileSystem(job);
+      in = fs.open(p);
+      long start = split.getStart();
+      // find the offset to start at a record boundary
+      offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
+      in.seek(start + offset);
+      length = split.getLength();
     }
 
     public void close() throws IOException {
@@ -172,23 +239,29 @@
     }
 
     public float getProgress() throws IOException {
-      return in.getProgress();
+      return (float) offset / length;
     }
 
     public boolean next(Text key, Text value) throws IOException {
-      if (in.next(junk, line)) {
-        if (line.getLength() < KEY_LENGTH) {
-          key.set(line);
-          value.clear();
-        } else {
-          byte[] bytes = line.getBytes();
-          key.set(bytes, 0, KEY_LENGTH);
-          value.set(bytes, KEY_LENGTH, line.getLength() - KEY_LENGTH);
-        }
-        return true;
-      } else {
+      if (offset >= length) {
         return false;
       }
+      int read = 0;
+      while (read < RECORD_LENGTH) {
+        long newRead = in.read(buffer, read, RECORD_LENGTH - read);
+        if (newRead == -1) {
+          if (read == 0) {
+            return false;
+          } else {
+            throw new EOFException("read past eof");
+          }
+        }
+        read += newRead;
+      }
+      key.set(buffer, 0, KEY_LENGTH);
+      value.set(buffer, KEY_LENGTH, VALUE_LENGTH);
+      offset += RECORD_LENGTH;
+      return true;
     }
   }
 
@@ -201,12 +274,28 @@
   }
 
   @Override
+  protected FileSplit makeSplit(Path file, long start, long length, 
+                                String[] hosts) {
+    return new TeraFileSplit(file, start, length, hosts);
+  }
+
+  @Override
   public InputSplit[] getSplits(JobConf conf, int splits) throws IOException {
     if (conf == lastConf) {
       return lastResult;
     }
+    long t1, t2, t3;
+    t1 = System.currentTimeMillis();
     lastConf = conf;
     lastResult = super.getSplits(conf, splits);
+    t2 = System.currentTimeMillis();
+    System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
+    if (conf.getBoolean("terasort.use.terascheduler", true)) {
+      TeraScheduler scheduler = new TeraScheduler((FileSplit[]) lastResult, conf);
+      lastResult = scheduler.getNewFileSplits();
+      t3 = System.currentTimeMillis(); 
+      System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
+    }
     return lastResult;
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -18,22 +18,29 @@
 
 package org.apache.hadoop.examples.terasort;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InvalidJobConfException;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.util.Progressable;
 
 /**
- * A streamlined text output format that writes key, value, and "\r\n".
+ * An output format that writes the key and value appended together.
  */
-public class TeraOutputFormat extends TextOutputFormat<Text,Text> {
+public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
   static final String FINAL_SYNC_ATTRIBUTE = "terasort.final.sync";
 
   /**
@@ -50,28 +57,38 @@
     return conf.getBoolean(FINAL_SYNC_ATTRIBUTE, false);
   }
 
-  static class TeraRecordWriter extends LineRecordWriter<Text,Text> {
-    private static final byte[] newLine = "\r\n".getBytes();
+  static class TeraRecordWriter implements RecordWriter<Text,Text> {
     private boolean finalSync = false;
+    private FSDataOutputStream out;
 
-    public TeraRecordWriter(DataOutputStream out,
+    public TeraRecordWriter(FSDataOutputStream out,
                             JobConf conf) {
-      super(out);
       finalSync = getFinalSync(conf);
+      this.out = out;
     }
 
     public synchronized void write(Text key, 
                                    Text value) throws IOException {
       out.write(key.getBytes(), 0, key.getLength());
       out.write(value.getBytes(), 0, value.getLength());
-      out.write(newLine, 0, newLine.length);
     }
     
-    public void close() throws IOException {
+    public void close(Reporter reporter) throws IOException {
       if (finalSync) {
-        ((FSDataOutputStream) out).sync();
+        out.sync();
       }
-      super.close(null);
+      out.close();
+    }
+  }
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, 
+                               JobConf job
+                              ) throws InvalidJobConfException, IOException {
+    // Ensure that the output directory is set and not already there
+    Path outDir = getOutputPath(job);
+    if (outDir == null) {
+      throw new InvalidJobConfException("Output directory not set in JobConf.");
     }
   }
 
@@ -85,4 +102,25 @@
     FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
     return new TeraRecordWriter(fileOut, job);
   }
+  
+  public static class TeraOutputCommitter extends FileOutputCommitter {
+
+    @Override
+    public void commitJob(JobContext jobContext) {
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+      return taskContext.getTaskAttemptID().getTaskID().getTaskType() ==
+               TaskType.REDUCE;
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) {
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) {
+    }
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java Sat Nov 28 20:26:01 2009
@@ -18,20 +18,19 @@
 
 package org.apache.hadoop.examples.terasort;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.examples.terasort.TeraOutputFormat.TeraOutputCommitter;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -88,13 +87,13 @@
         if (key.getLength() <= level) {
           return child[0].findPartition(key);
         }
-        return child[key.getBytes()[level]].findPartition(key);
+        return child[key.getBytes()[level] & 0xff].findPartition(key);
       }
       void setChild(int idx, TrieNode child) {
         this.child[idx] = child;
       }
       void print(PrintStream strm) throws IOException {
-        for(int ch=0; ch < 255; ++ch) {
+        for(int ch=0; ch < 256; ++ch) {
           for(int i = 0; i < 2*getLevel(); ++i) {
             strm.print(' ');
           }
@@ -123,7 +122,7 @@
       }
       int findPartition(Text key) {
         for(int i=lower; i<upper; ++i) {
-          if (splitPoints[i].compareTo(key) >= 0) {
+          if (splitPoints[i].compareTo(key) > 0) {
             return i;
           }
         }
@@ -150,16 +149,15 @@
      */
     private static Text[] readPartitions(FileSystem fs, Path p, 
                                          JobConf job) throws IOException {
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
-      List<Text> parts = new ArrayList<Text>();
-      Text key = new Text();
-      NullWritable value = NullWritable.get();
-      while (reader.next(key, value)) {
-        parts.add(key);
-        key = new Text();
+      int reduces = job.getNumReduceTasks();
+      Text[] result = new Text[reduces - 1];
+      DataInputStream reader = fs.open(p);
+      for(int i=0; i < reduces - 1; ++i) {
+        result[i] = new Text();
+        result[i].readFields(reader);
       }
       reader.close();
-      return parts.toArray(new Text[parts.size()]);  
+      return result;
     }
 
     /**
@@ -197,7 +195,7 @@
                                      maxDepth);
       }
       // pick up the rest
-      trial.getBytes()[depth] = 127;
+      trial.getBytes()[depth] = (byte) 255;
       result.child[255] = buildTrie(splits, currentBound, upper, trial,
                                     maxDepth);
       return result;
@@ -223,27 +221,77 @@
     
   }
   
+  /**
+   * A total order partitioner that assigns keys based on their first 
+   * PREFIX_LENGTH bytes, assuming a flat distribution.
+   */
+  public static class SimplePartitioner implements Partitioner<Text, Text>{
+    int prefixesPerReduce;
+    private static final int PREFIX_LENGTH = 3;
+    public void configure(JobConf job) {
+      prefixesPerReduce = (int) Math.ceil((1 << (8 * PREFIX_LENGTH)) / 
+                                          (float) job.getNumReduceTasks());
+    }
+    @Override
+    public int getPartition(Text key, Text value, int numPartitions) {
+      byte[] bytes = key.getBytes();
+      int len = Math.min(PREFIX_LENGTH, key.getLength());
+      int prefix = 0;
+      for(int i=0; i < len; ++i) {
+        prefix = (prefix << 8) | (0xff & bytes[i]);
+      }
+      return prefix / prefixesPerReduce;
+    }
+  }
+
+  public static boolean getUseSimplePartitioner(Configuration conf) {
+    return conf.getBoolean("terasort.partitioner.simple", false);
+  }
+
+  public static void setUseSimplePartitioner(Configuration conf,
+                                             boolean value) {
+    conf.setBoolean("terasort.partitioner.simple", value);
+  }
+
   public int run(String[] args) throws Exception {
     LOG.info("starting");
     JobConf job = (JobConf) getConf();
     Path inputDir = new Path(args[0]);
-    inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
-    Path partitionFile = new Path(inputDir, TeraInputFormat.PARTITION_FILENAME);
-    URI partitionUri = new URI(partitionFile.toString() +
-                               "#" + TeraInputFormat.PARTITION_FILENAME);
-    TeraInputFormat.setInputPaths(job, new Path(args[0]));
-    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    Path outputDir = new Path(args[1]);
+    boolean useSimplePartitioner = getUseSimplePartitioner(job);
+    FileSystem outputFileSystem = outputDir.getFileSystem(job);
+    outputDir = outputDir.makeQualified(outputFileSystem);
+    if (outputFileSystem.exists(outputDir)) {
+      throw new IOException("Output directory " + outputDir + 
+                            " already exists.");
+    }
+    TeraInputFormat.setInputPaths(job, inputDir);
+    FileOutputFormat.setOutputPath(job, outputDir);
     job.setJobName("TeraSort");
     job.setJarByClass(TeraSort.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
     job.setInputFormat(TeraInputFormat.class);
     job.setOutputFormat(TeraOutputFormat.class);
-    job.setPartitionerClass(TotalOrderPartitioner.class);
-    TeraInputFormat.writePartitionFile(job, partitionFile);
-    DistributedCache.addCacheFile(partitionUri, job);
-    DistributedCache.createSymlink(job);
-    job.setInt("dfs.replication", 1);
+    if (useSimplePartitioner) {
+      job.setPartitionerClass(SimplePartitioner.class);
+    } else {
+      long start = System.currentTimeMillis();
+      Path partitionFile = new Path(outputDir, 
+                                    TeraInputFormat.PARTITION_FILENAME);
+      URI partitionUri = new URI(partitionFile.toString() +
+                                 "#" + TeraInputFormat.PARTITION_FILENAME);
+      TeraInputFormat.writePartitionFile(job, partitionFile);
+      DistributedCache.addCacheFile(partitionUri, job);
+      DistributedCache.createSymlink(job);    
+      long end = System.currentTimeMillis();
+      System.out.println("Spent " + (end - start) + "ms computing partitions.");
+      job.setPartitionerClass(TotalOrderPartitioner.class);
+    }
+    job.setOutputCommitter(TeraOutputCommitter.class);
+    
+    job.setInt("dfs.replication", 
+               job.getInt("terasort.output.replication", 1));
     TeraOutputFormat.setFinalSync(job, true);
     JobClient.runJob(job);
     LOG.info("done");

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java Sat Nov 28 20:26:01 2009
@@ -20,9 +20,11 @@
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.zip.Checksum;
 
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -31,8 +33,10 @@
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -50,14 +54,24 @@
  * will have the problem report.
  */
 public class TeraValidate extends Configured implements Tool {
-  private static final Text error = new Text("error");
+  private static final Text ERROR = new Text("error");
+  private static final Text CHECKSUM = new Text("checksum");
+  
+  private static String textifyBytes(Text t) {
+    BytesWritable b = new BytesWritable();
+    b.set(t.getBytes(), 0, t.getLength());
+    return b.toString();
+  }
 
   static class ValidateMapper extends MapReduceBase 
       implements Mapper<Text,Text,Text,Text> {
     private Text lastKey;
     private OutputCollector<Text,Text> output;
     private String filename;
-    
+    private Unsigned16 checksum = new Unsigned16();
+    private Unsigned16 tmp = new Unsigned16();
+    private Checksum crc32 = new PureJavaCrc32();
+
     /**
      * Get the final part of the input name
      * @param split the input split
@@ -67,26 +81,38 @@
       return split.getPath().getName();
     }
 
+    private int getPartition(FileSplit split) {
+      return Integer.parseInt(split.getPath().getName().substring(5));
+    }
+
     public void map(Text key, Text value, OutputCollector<Text,Text> output,
                     Reporter reporter) throws IOException {
       if (lastKey == null) {
-        filename = getFilename((FileSplit) reporter.getInputSplit());
+        FileSplit fs = (FileSplit) reporter.getInputSplit();
+        filename = getFilename(fs);
         output.collect(new Text(filename + ":begin"), key);
         lastKey = new Text();
         this.output = output;
       } else {
         if (key.compareTo(lastKey) < 0) {
-          output.collect(error, new Text("misorder in " + filename + 
-                                         " last: '" + lastKey + 
-                                         "' current: '" + key + "'"));
+          output.collect(ERROR, new Text("misorder in " + filename + 
+                                         " between " + textifyBytes(lastKey) + 
+                                         " and " + textifyBytes(key)));
         }
       }
+      // compute the crc of the key and value and add it to the sum
+      crc32.reset();
+      crc32.update(key.getBytes(), 0, key.getLength());
+      crc32.update(value.getBytes(), 0, value.getLength());
+      tmp.set(crc32.getValue());
+      checksum.add(tmp);
       lastKey.set(key);
     }
     
     public void close() throws IOException {
       if (lastKey != null) {
         output.collect(new Text(filename + ":end"), lastKey);
+        output.collect(CHECKSUM, new Text(checksum.toString()));
       }
     }
   }
@@ -104,20 +130,31 @@
     public void reduce(Text key, Iterator<Text> values,
                        OutputCollector<Text, Text> output, 
                        Reporter reporter) throws IOException {
-      if (error.equals(key)) {
+      if (ERROR.equals(key)) {
         while(values.hasNext()) {
           output.collect(key, values.next());
         }
+      } else if (CHECKSUM.equals(key)) {
+        Unsigned16 tmp = new Unsigned16();
+        Unsigned16 sum = new Unsigned16();
+        while (values.hasNext()) {
+          String val = values.next().toString();
+          tmp.set(val);
+          sum.add(tmp);
+        }
+        output.collect(CHECKSUM, new Text(sum.toString()));
       } else {
         Text value = values.next();
         if (firstKey) {
           firstKey = false;
         } else {
           if (value.compareTo(lastValue) < 0) {
-            output.collect(error, 
-                           new Text("misordered keys last: " + 
-                                    lastKey + " '" + lastValue +
-                                    "' current: " + key + " '" + value + "'"));
+            output.collect(ERROR, 
+                           new Text("bad key partitioning:\n  file " + 
+                                    lastKey + " key " + 
+                                    textifyBytes(lastValue) +
+                                    "\n  file " + key + " key " + 
+                                    textifyBytes(value)));
           }
         }
         lastKey.set(key);
@@ -127,8 +164,16 @@
     
   }
 
+  private static void usage() throws IOException {
+    System.err.println("teravalidate <out-dir> <report-dir>");
+  }
+
   public int run(String[] args) throws Exception {
     JobConf job = (JobConf) getConf();
+    if (args.length != 2) {
+      usage();
+      return 1;
+    }
     TeraInputFormat.setInputPaths(job, new Path(args[0]));
     FileOutputFormat.setOutputPath(job, new Path(args[1]));
     job.setJobName("TeraValidate");
@@ -140,7 +185,8 @@
     // force a single reducer
     job.setNumReduceTasks(1);
     // force a single split 
-    job.setLong("mapred.min.split.size", Long.MAX_VALUE);
+    job.setLong(org.apache.hadoop.mapreduce.lib.input.
+                FileInputFormat.SPLIT_MINSIZE, Long.MAX_VALUE);
     job.setInputFormat(TeraInputFormat.class);
     JobClient.runJob(job);
     return 0;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/package.html
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/package.html?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/package.html (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/package.html Sat Nov 28 20:26:01 2009
@@ -102,8 +102,9 @@
 1800 tasks to generate a total of 10 billion rows in HDFS, with a
 block size of 1024 MB.
 TeraSort was configured with 1800 maps and 1800 reduces, and
-<i>io.sort.mb</i>,
-<i>io.sort.factor</i>, <i>fs.inmemory.size.mb</i>, and task heap size
+<i>mapreduce.task.io.sort.mb</i>,
+<i>mapreduce.task.io.sort.factor</i>,
+<i>fs.inmemory.size.mb</i>, and task heap size
 sufficient that transient data was never spilled to disk, other at the
 end of the map. The sampler looked at 100,000 keys to determine the
 reduce boundaries, which lead to imperfect balancing with reduce

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/conf/word-part.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/conf/word-part.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/conf/word-part.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/conf/word-part.xml Sat Nov 28 20:26:01 2009
@@ -2,22 +2,22 @@
 <configuration>
 
 <property>
-  <name>mapred.reduce.tasks</name>
+  <name>mapreduce.job.reduces</name>
   <value>2</value>
 </property>
 
 <property>
-  <name>hadoop.pipes.executable</name>
+  <name>mapreduce.pipes.executable</name>
   <value>hdfs:/examples/bin/wordcount-part</value>
 </property>
 
 <property>
-  <name>hadoop.pipes.java.recordreader</name>
+  <name>mapreduce.pipes.isjavarecordreader</name>
   <value>true</value>
 </property>
 
 <property>
-  <name>hadoop.pipes.java.recordwriter</name>
+  <name>mapreduce.pipes.isjavarecordwriter</name>
   <value>true</value>
 </property>
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/conf/word.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/conf/word.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/conf/word.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/conf/word.xml Sat Nov 28 20:26:01 2009
@@ -2,12 +2,12 @@
 <configuration>
 
 <property>
-  <name>mapred.reduce.tasks</name>
+  <name>mapreduce.job.reduces</name>
   <value>2</value>
 </property>
 
 <property>
-  <name>hadoop.pipes.executable</name>
+  <name>mapreduce.pipes.executable</name>
   <value>/examples/bin/wordcount-simple#wordcount-simple</value>
   <description> Executable path is given as "path#executable-name"
                 sothat the executable will have a symlink in working directory.
@@ -16,12 +16,12 @@
 </property>
 
 <property>
-  <name>hadoop.pipes.java.recordreader</name>
+  <name>mapreduce.pipes.java.recordreader</name>
   <value>true</value>
 </property>
 
 <property>
-  <name>hadoop.pipes.java.recordwriter</name>
+  <name>mapreduce.pipes.java.recordwriter</name>
   <value>true</value>
 </property>
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/impl/sort.cc
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/impl/sort.cc?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/impl/sort.cc (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/impl/sort.cc Sat Nov 28 20:26:01 2009
@@ -52,7 +52,7 @@
   }
 };
 
-const std::string SortMap::MAP_KEEP_PERCENT("hadoop.sort.map.keep.percent");
+const std::string SortMap::MAP_KEEP_PERCENT("mapreduce.loadgen.sort.map.preserve.percent");
 
 class SortReduce: public HadoopPipes::Reducer {
 private:
@@ -87,7 +87,7 @@
 };
 
 const std::string 
-  SortReduce::REDUCE_KEEP_PERCENT("hadoop.sort.reduce.keep.percent");
+  SortReduce::REDUCE_KEEP_PERCENT("mapreduce.loadgen.sort.reduce.preserve.percent");
 
 int main(int argc, char *argv[]) {
   return HadoopPipes::runTask(HadoopPipes::TemplateFactory<SortMap,

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/impl/wordcount-nopipe.cc
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/impl/wordcount-nopipe.cc?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/impl/wordcount-nopipe.cc (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/pipes/impl/wordcount-nopipe.cc Sat Nov 28 20:26:01 2009
@@ -118,8 +118,8 @@
 public:
   WordCountWriter(HadoopPipes::ReduceContext& context) {
     const HadoopPipes::JobConf* job = context.getJobConf();
-    int part = job->getInt("mapred.task.partition");
-    std::string outDir = job->get("mapred.work.output.dir");
+    int part = job->getInt("mapreduce.task.partition");
+    std::string outDir = job->get("mapreduce.task.output.dir");
     // remove the file: schema substring
     std::string::size_type posn = outDir.find(":");
     HADOOP_ASSERT(posn != std::string::npos, 

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/java:713112
 /hadoop/core/trunk/src/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/java:804974-807678
+/hadoop/mapreduce/trunk/src/java:804974-884916



Mime
View raw message