hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r1235548 [5/8] - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/ src/mapred/org/apache/hadoop/mapreduce/ src/mapred/org/apache/hadoop/mapreduce/lib/db/ src/mapred/org/apache/hadoop/map...
Date Tue, 24 Jan 2012 23:22:01 GMT
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Utility for collecting samples and writing a partition file for
+ * {@link TotalOrderPartitioner}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class InputSampler<K,V> extends Configured implements Tool  {
+
+  private static final Log LOG = LogFactory.getLog(InputSampler.class);
+
+  static int printUsage() {
+    System.out.println("sampler -r <reduces>\n" +
+      "      [-inFormat <input format class>]\n" +
+      "      [-keyClass <map input & output key class>]\n" +
+      "      [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
+      "             // Sample from random splits at random (general)\n" +
+      "       -splitSample <numSamples> <maxsplits> | " +
+      "             // Sample from first records in splits (random data)\n"+
+      "       -splitInterval <double pcnt> <maxsplits>]" +
+      "             // Sample from splits at intervals (sorted data)");
+    System.out.println("Default sampler: -splitRandom 0.1 10000 10");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return -1;
+  }
+
+  public InputSampler(Configuration conf) {
+    setConf(conf);
+  }
+
+  /**
+   * Interface to sample using an 
+   * {@link org.apache.hadoop.mapreduce.InputFormat}.
+   */
+  public interface Sampler<K,V> {
+    /**
+     * For a given job, collect and return a subset of the keys from the
+     * input data.
+     */
+    K[] getSample(InputFormat<K,V> inf, Job job) 
+    throws IOException, InterruptedException;
+  }
+
+  /**
+   * Samples the first n records from s splits.
+   * Inexpensive way to sample random data.
+   */
+  public static class SplitSampler<K,V> implements Sampler<K,V> {
+
+    private final int numSamples;
+    private final int maxSplitsSampled;
+
+    /**
+     * Create a SplitSampler sampling <em>all</em> splits.
+     * Takes the first numSamples / numSplits records from each split.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public SplitSampler(int numSamples) {
+      this(numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new SplitSampler.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public SplitSampler(int numSamples, int maxSplitsSampled) {
+      this.numSamples = numSamples;
+      this.maxSplitsSampled = maxSplitsSampled;
+    }
+
+    /**
+     * From each split sampled, take the first numSamples / numSplits records.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, Job job) 
+        throws IOException, InterruptedException {
+      List<InputSplit> splits = inf.getSplits(job);
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+      int samplesPerSplit = numSamples / splitsToSample;
+      long records = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        TaskAttemptContext samplingContext = new TaskAttemptContext(
+            job.getConfiguration(), new TaskAttemptID());
+        RecordReader<K,V> reader = inf.createRecordReader(
+            splits.get(i), samplingContext);
+        reader.initialize(splits.get(i), samplingContext);
+        while (reader.nextKeyValue()) {
+          samples.add(ReflectionUtils.copy(job.getConfiguration(),
+                                           reader.getCurrentKey(), null));
+          ++records;
+          if ((i+1) * samplesPerSplit <= records) {
+            break;
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from random points in the input.
+   * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
+   * each split.
+   */
+  public static class RandomSampler<K,V> implements Sampler<K,V> {
+    private double freq;
+    private final int numSamples;
+    private final int maxSplitsSampled;
+
+    /**
+     * Create a new RandomSampler sampling <em>all</em> splits.
+     * This will read every split at the client, which is very expensive.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public RandomSampler(double freq, int numSamples) {
+      this(freq, numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new RandomSampler.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
+      this.freq = freq;
+      this.numSamples = numSamples;
+      this.maxSplitsSampled = maxSplitsSampled;
+    }
+
+    /**
+     * Randomize the split order, then take the specified number of keys from
+     * each split sampled, where each key is selected with the specified
+     * probability and possibly replaced by a subsequently selected key when
+     * the quota of keys from that split is satisfied.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, Job job) 
+        throws IOException, InterruptedException {
+      List<InputSplit> splits = inf.getSplits(job);
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+
+      Random r = new Random();
+      long seed = r.nextLong();
+      r.setSeed(seed);
+      LOG.debug("seed: " + seed);
+      // shuffle splits
+      for (int i = 0; i < splits.size(); ++i) {
+        InputSplit tmp = splits.get(i);
+        int j = r.nextInt(splits.size());
+        splits.set(i, splits.get(j));
+        splits.set(j, tmp);
+      }
+      // our target rate is in terms of the maximum number of sample splits,
+      // but we accept the possibility of sampling additional splits to hit
+      // the target sample keyset
+      for (int i = 0; i < splitsToSample ||
+                     (i < splits.size() && samples.size() < numSamples); ++i) {
+        TaskAttemptContext samplingContext = new TaskAttemptContext(
+            job.getConfiguration(), new TaskAttemptID());
+        RecordReader<K,V> reader = inf.createRecordReader(
+            splits.get(i), samplingContext);
+        reader.initialize(splits.get(i), samplingContext);
+        while (reader.nextKeyValue()) {
+          if (r.nextDouble() <= freq) {
+            if (samples.size() < numSamples) {
+              samples.add(ReflectionUtils.copy(job.getConfiguration(),
+                                               reader.getCurrentKey(), null));
+            } else {
+              // When exceeding the maximum number of samples, replace a
+              // random element with this one, then adjust the frequency
+              // to reflect the possibility of existing elements being
+              // pushed out
+              int ind = r.nextInt(numSamples);
+              if (ind != numSamples) {
+                samples.set(ind, ReflectionUtils.copy(job.getConfiguration(),
+                                 reader.getCurrentKey(), null));
+              }
+              freq *= (numSamples - 1) / (double) numSamples;
+            }
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from s splits at regular intervals.
+   * Useful for sorted data.
+   */
+  public static class IntervalSampler<K,V> implements Sampler<K,V> {
+    private final double freq;
+    private final int maxSplitsSampled;
+
+    /**
+     * Create a new IntervalSampler sampling <em>all</em> splits.
+     * @param freq The frequency with which records will be emitted.
+     */
+    public IntervalSampler(double freq) {
+      this(freq, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new IntervalSampler.
+     * @param freq The frequency with which records will be emitted.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     * @see #getSample
+     */
+    public IntervalSampler(double freq, int maxSplitsSampled) {
+      this.freq = freq;
+      this.maxSplitsSampled = maxSplitsSampled;
+    }
+
+    /**
+     * For each split sampled, emit when the ratio of the number of records
+     * retained to the total record count is less than the specified
+     * frequency.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, Job job) 
+        throws IOException, InterruptedException {
+      List<InputSplit> splits = inf.getSplits(job);
+      ArrayList<K> samples = new ArrayList<K>();
+      int splitsToSample = Math.min(maxSplitsSampled, splits.size());
+      long records = 0;
+      long kept = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        TaskAttemptContext samplingContext = new TaskAttemptContext(
+            job.getConfiguration(), new TaskAttemptID());
+        RecordReader<K,V> reader = inf.createRecordReader(
+            splits.get(i), samplingContext);
+        reader.initialize(splits.get(i), samplingContext);
+        while (reader.nextKeyValue()) {
+          ++records;
+          if ((double) kept / records < freq) {
+            samples.add(ReflectionUtils.copy(job.getConfiguration(),
+                                 reader.getCurrentKey(), null));
+            ++kept;
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Write a partition file for the given job, using the Sampler provided.
+   * Queries the sampler for a sample keyset, sorts by the output key
+   * comparator, selects the keys for each rank, and writes to the destination
+   * returned from {@link TotalOrderPartitioner#getPartitionFile}.
+   */
+  @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
+  public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler) 
+      throws IOException, ClassNotFoundException, InterruptedException {
+    Configuration conf = job.getConfiguration();
+    final InputFormat inf = 
+        ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
+    int numPartitions = job.getNumReduceTasks();
+    K[] samples = sampler.getSample(inf, job);
+    LOG.info("Using " + samples.length + " samples");
+    RawComparator<K> comparator =
+      (RawComparator<K>) job.getSortComparator();
+    Arrays.sort(samples, comparator);
+    Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
+    FileSystem fs = dst.getFileSystem(conf);
+    if (fs.exists(dst)) {
+      fs.delete(dst, false);
+    }
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, 
+      conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
+    NullWritable nullValue = NullWritable.get();
+    float stepSize = samples.length / (float) numPartitions;
+    int last = -1;
+    for(int i = 1; i < numPartitions; ++i) {
+      int k = Math.round(stepSize * i);
+      while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
+        ++k;
+      }
+      writer.append(samples[k], nullValue);
+      last = k;
+    }
+    writer.close();
+  }
+
+  /**
+   * Driver for InputSampler from the command line.
+   * Configures a JobConf instance and calls {@link #writePartitionFile}.
+   */
+  public int run(String[] args) throws Exception {
+    Job job = new Job(getConf());
+    ArrayList<String> otherArgs = new ArrayList<String>();
+    Sampler<K,V> sampler = null;
+    for(int i=0; i < args.length; ++i) {
+      try {
+        if ("-r".equals(args[i])) {
+          job.setNumReduceTasks(Integer.parseInt(args[++i]));
+        } else if ("-inFormat".equals(args[i])) {
+          job.setInputFormatClass(
+              Class.forName(args[++i]).asSubclass(InputFormat.class));
+        } else if ("-keyClass".equals(args[i])) {
+          job.setMapOutputKeyClass(
+              Class.forName(args[++i]).asSubclass(WritableComparable.class));
+        } else if ("-splitSample".equals(args[i])) {
+          int numSamples = Integer.parseInt(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler = new SplitSampler<K,V>(numSamples, maxSplits);
+        } else if ("-splitRandom".equals(args[i])) {
+          double pcnt = Double.parseDouble(args[++i]);
+          int numSamples = Integer.parseInt(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler = new RandomSampler<K,V>(pcnt, numSamples, maxSplits);
+        } else if ("-splitInterval".equals(args[i])) {
+          double pcnt = Double.parseDouble(args[++i]);
+          int maxSplits = Integer.parseInt(args[++i]);
+          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
+          sampler = new IntervalSampler<K,V>(pcnt, maxSplits);
+        } else {
+          otherArgs.add(args[i]);
+        }
+      } catch (NumberFormatException except) {
+        System.out.println("ERROR: Integer expected instead of " + args[i]);
+        return printUsage();
+      } catch (ArrayIndexOutOfBoundsException except) {
+        System.out.println("ERROR: Required parameter missing from " +
+            args[i-1]);
+        return printUsage();
+      }
+    }
+    if (job.getNumReduceTasks() <= 1) {
+      System.err.println("Sampler requires more than one reducer");
+      return printUsage();
+    }
+    if (otherArgs.size() < 2) {
+      System.out.println("ERROR: Wrong number of parameters: ");
+      return printUsage();
+    }
+    if (null == sampler) {
+      sampler = new RandomSampler<K,V>(0.1, 10000, 10);
+    }
+
+    Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
+    TotalOrderPartitioner.setPartitionFile(getConf(), outf);
+    for (String s : otherArgs) {
+      FileInputFormat.addInputPath(job, new Path(s));
+    }
+    InputSampler.<K,V>writePartitionFile(job, sampler);
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    InputSampler<?,?> sampler = new InputSampler(new Configuration());
+    int res = ToolRunner.run(sampler, args);
+    System.exit(res);
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
+
+
+/**
+ * This comparator implementation provides a subset of the features provided
+ * by the Unix/GNU Sort. In particular, the supported features are:
+ * -n, (Sort numerically)
+ * -r, (Reverse the result of comparison)
+ * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number
+ *  of the field to use, and c is the number of the first character from the
+ *  beginning of the field. Fields and character posns are numbered starting
+ *  with 1; a character position of zero in pos2 indicates the field's last
+ *  character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
+ *  of the field); if omitted from pos2, it defaults to 0 (the end of the
+ *  field). opts are ordering options (any of 'nr' as described above). 
+ * We assume that the fields in the key are separated by 
+ * {@link JobContext#MAP_OUTPUT_KEY_FIELD_SEPERATOR}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class KeyFieldBasedComparator<K, V> extends WritableComparator 
+    implements Configurable {
+  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
+  public static String COMPARATOR_OPTIONS = "mapreduce.partition.keycomparator.options";
+  private static final byte NEGATIVE = (byte)'-';
+  private static final byte ZERO = (byte)'0';
+  private static final byte DECIMAL = (byte)'.';
+  private Configuration conf;
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    String option = conf.get(COMPARATOR_OPTIONS);
+    String keyFieldSeparator = conf.get("mapreduce.map.output.key.field.separator","\t");
+    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
+    keyFieldHelper.parseOption(option);
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  public KeyFieldBasedComparator() {
+    super(Text.class);
+  }
+    
+  public int compare(byte[] b1, int s1, int l1,
+      byte[] b2, int s2, int l2) {
+    int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+    int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+    List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
+
+    if (allKeySpecs.size() == 0) {
+      return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
+    }
+    
+    int []lengthIndicesFirst = 
+      keyFieldHelper.getWordLengths(b1, s1 + n1, s1 + l1);
+    int []lengthIndicesSecond = 
+      keyFieldHelper.getWordLengths(b2, s2 + n2, s2 + l2);
+    
+    for (KeyDescription keySpec : allKeySpecs) {
+      int startCharFirst = keyFieldHelper.getStartOffset(b1, s1 + n1, s1 + l1,
+        lengthIndicesFirst, keySpec);
+      int endCharFirst = keyFieldHelper.getEndOffset(b1, s1 + n1, s1 + l1, 
+        lengthIndicesFirst, keySpec);
+      int startCharSecond = keyFieldHelper.getStartOffset(b2, s2 + n2, s2 + l2,
+        lengthIndicesSecond, keySpec);
+      int endCharSecond = keyFieldHelper.getEndOffset(b2, s2 + n2, s2 + l2, 
+        lengthIndicesSecond, keySpec);
+      int result;
+      if ((result = compareByteSequence(b1, startCharFirst, endCharFirst, b2, 
+          startCharSecond, endCharSecond, keySpec)) != 0) {
+        return result;
+      }
+    }
+    return 0;
+  }
+  
+  private int compareByteSequence(byte[] first, int start1, int end1, 
+      byte[] second, int start2, int end2, KeyDescription key) {
+    if (start1 == -1) {
+      if (key.reverse) {
+        return 1;
+      }
+      return -1;
+    }
+    if (start2 == -1) {
+      if (key.reverse) {
+        return -1; 
+      }
+      return 1;
+    }
+    int compareResult = 0;
+    if (!key.numeric) {
+      compareResult = compareBytes(first, start1, end1-start1 + 1, second,
+        start2, end2 - start2 + 1);
+    }
+    if (key.numeric) {
+      compareResult = numericalCompare (first, start1, end1, second, start2,
+        end2);
+    }
+    if (key.reverse) {
+      return -compareResult;
+    }
+    return compareResult;
+  }
+  
+  private int numericalCompare (byte[] a, int start1, int end1, 
+      byte[] b, int start2, int end2) {
+    int i = start1;
+    int j = start2;
+    int mul = 1;
+    byte first_a = a[i];
+    byte first_b = b[j];
+    if (first_a == NEGATIVE) {
+      if (first_b != NEGATIVE) {
+        //check for cases like -0.0 and 0.0 (they should be declared equal)
+        return oneNegativeCompare(a, start1 + 1, end1, b, start2, end2);
+      }
+      i++;
+    }
+    if (first_b == NEGATIVE) {
+      if (first_a != NEGATIVE) {
+        //check for cases like 0.0 and -0.0 (they should be declared equal)
+        return -oneNegativeCompare(b, start2+1, end2, a, start1, end1);
+      }
+      j++;
+    }
+    if (first_b == NEGATIVE && first_a == NEGATIVE) {
+      mul = -1;
+    }
+
+    //skip over ZEROs
+    while (i <= end1) {
+      if (a[i] != ZERO) {
+        break;
+      }
+      i++;
+    }
+    while (j <= end2) {
+      if (b[j] != ZERO) {
+        break;
+      }
+      j++;
+    }
+    
+    //skip over equal characters and stopping at the first nondigit char
+    //The nondigit character could be '.'
+    while (i <= end1 && j <= end2) {
+      if (!isdigit(a[i]) || a[i] != b[j]) {
+        break;
+      }
+      i++; j++;
+    }
+    if (i <= end1) {
+      first_a = a[i];
+    }
+    if (j <= end2) {
+      first_b = b[j];
+    }
+    //store the result of the difference. This could be final result if the
+    //number of digits in the mantissa is the same in both the numbers 
+    int firstResult = first_a - first_b;
+    
+    //check whether we hit a decimal in the earlier scan
+    if ((first_a == DECIMAL && (!isdigit(first_b) || j > end2)) ||
+            (first_b == DECIMAL && (!isdigit(first_a) || i > end1))) {
+      return ((mul < 0) ? -decimalCompare(a, i, end1, b, j, end2) : 
+        decimalCompare(a, i, end1, b, j, end2));
+    }
+    //check the number of digits in the mantissa of the numbers
+    int numRemainDigits_a = 0;
+    int numRemainDigits_b = 0;
+    while (i <= end1) {
+      //if we encounter a non-digit treat the corresponding number as being 
+      //smaller      
+      if (isdigit(a[i++])) {
+        numRemainDigits_a++;
+      } else break;
+    }
+    while (j <= end2) {
+      //if we encounter a non-digit treat the corresponding number as being 
+      //smaller
+      if (isdigit(b[j++])) {
+        numRemainDigits_b++;
+      } else break;
+    }
+    int ret = numRemainDigits_a - numRemainDigits_b;
+    if (ret == 0) { 
+      return ((mul < 0) ? -firstResult : firstResult);
+    } else {
+      return ((mul < 0) ? -ret : ret);
+    }
+  }
+  private boolean isdigit(byte b) {
+    if ('0' <= b && b <= '9') {
+      return true;
+    }
+    return false;
+  }
+  private int decimalCompare(byte[] a, int i, int end1, 
+                             byte[] b, int j, int end2) {
+    if (i > end1) {
+      //if a[] has nothing remaining
+      return -decimalCompare1(b, ++j, end2);
+    }
+    if (j > end2) {
+      //if b[] has nothing remaining
+      return decimalCompare1(a, ++i, end1);
+    }
+    if (a[i] == DECIMAL && b[j] == DECIMAL) {
+      while (i <= end1 && j <= end2) {
+        if (a[i] != b[j]) {
+          if (isdigit(a[i]) && isdigit(b[j])) {
+            return a[i] - b[j];
+          }
+          if (isdigit(a[i])) {
+            return 1;
+          }
+          if (isdigit(b[j])) {
+            return -1;
+          }
+          return 0;
+        }
+        i++; j++;
+      }
+      if (i > end1 && j > end2) {
+        return 0;
+      }
+        
+      if (i > end1) {
+        //check whether there is a non-ZERO digit after potentially
+        //a number of ZEROs (e.g., a=.4444, b=.444400004)
+        return -decimalCompare1(b, j, end2);
+      }
+      if (j > end2) {
+        //check whether there is a non-ZERO digit after potentially
+        //a number of ZEROs (e.g., b=.4444, a=.444400004)
+        return decimalCompare1(a, i, end1);
+      }
+    }
+    else if (a[i] == DECIMAL) {
+      return decimalCompare1(a, ++i, end1);
+    }
+    else if (b[j] == DECIMAL) {
+      return -decimalCompare1(b, ++j, end2);
+    }
+    return 0;
+  }
+  
+  private int decimalCompare1(byte[] a, int i, int end) {
+    while (i <= end) {
+      if (a[i] == ZERO) {
+        i++;
+        continue;
+      }
+      if (isdigit(a[i])) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+    return 0;
+  }
+  
+  private int oneNegativeCompare(byte[] a, int start1, int end1, 
+      byte[] b, int start2, int end2) {
+    //here a[] is negative and b[] is positive
+    //We have to ascertain whether the number contains any digits.
+    //If it does, then it is a smaller number for sure. If not,
+    //then we need to scan b[] to find out whether b[] has a digit
+    //If b[] does contain a digit, then b[] is certainly
+    //greater. If not, that is, both a[] and b[] don't contain
+    //digits then they should be considered equal.
+    if (!isZero(a, start1, end1)) {
+      return -1;
+    }
+    //reached here - this means that a[] is a ZERO
+    if (!isZero(b, start2, end2)) {
+      return -1;
+    }
+    //reached here - both numbers are basically ZEROs and hence
+    //they should compare equal
+    return 0;
+  }
+  
+  private boolean isZero(byte a[], int start, int end) {
+    //check for zeros in the significand part as well as the decimal part
+    //note that we treat the non-digit characters as ZERO
+    int i = start;
+    //we check the significand for being a ZERO
+    while (i <= end) {
+      if (a[i] != ZERO) {
+        if (a[i] != DECIMAL && isdigit(a[i])) {
+          return false;
+        }
+        break;
+      }
+      i++;
+    }
+
+    if (i != (end+1) && a[i++] == DECIMAL) {
+      //we check the decimal part for being a ZERO
+      while (i <= end) {
+        if (a[i] != ZERO) {
+          if (isdigit(a[i])) {
+            return false;
+          }
+          break;
+        }
+        i++;
+      }
+    }
+    return true;
+  }
+  /**
+   * Set the {@link KeyFieldBasedComparator} options used to compare keys.
+   * 
+   * @param keySpec the key specification of the form -k pos1[,pos2], where,
+   *  pos is of the form f[.c][opts], where f is the number
+   *  of the key field to use, and c is the number of the first character from
+   *  the beginning of the field. Fields and character posns are numbered 
+   *  starting with 1; a character position of zero in pos2 indicates the
+   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
+   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
+   *  (the end of the field). opts are ordering options. The supported options
+   *  are:
+   *    -n, (Sort numerically)
+   *    -r, (Reverse the result of comparison)                 
+   */
+  public static void setKeyFieldComparatorOptions(Job job, String keySpec) {
+    job.getConfiguration().set(COMPARATOR_OPTIONS, keySpec);
+  }
+  
+  /**
+   * Get the {@link KeyFieldBasedComparator} options
+   */
+  public static String getKeyFieldComparatorOption(JobContext job) {
+    return job.getConfiguration().get(COMPARATOR_OPTIONS);
+  }
+
+
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
+
+ /**   
+  *  Defines a way to partition keys based on certain key fields (also see
+  *  {@link KeyFieldBasedComparator}.
+  *  The key specification supported is of the form -k pos1[,pos2], where,
+  *  pos is of the form f[.c][opts], where f is the number
+  *  of the key field to use, and c is the number of the first character from
+  *  the beginning of the field. Fields and character posns are numbered 
+  *  starting with 1; a character position of zero in pos2 indicates the
+  *  field's last character. If '.c' is omitted from pos1, it defaults to 1
+  *  (the beginning of the field); if omitted from pos2, it defaults to 0 
+  *  (the end of the field).
+  * 
+  */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class KeyFieldBasedPartitioner<K2, V2> extends Partitioner<K2, V2> 
+    implements Configurable {
+
+  private static final Log LOG = LogFactory.getLog(
+                                   KeyFieldBasedPartitioner.class.getName());
+  public static String PARTITIONER_OPTIONS = 
+    "mapreduce.partition.keypartitioner.options";
+  private int numOfPartitionFields;
+  
+  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
+  
+  private Configuration conf;
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    String keyFieldSeparator = 
+      conf.get("mapreduce.map.output.key.field.separator", "\t");
+    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
+    if (conf.get("num.key.fields.for.partition") != null) {
+      LOG.warn("Using deprecated num.key.fields.for.partition. " +
+      		"Use mapreduce.partition.keypartitioner.options instead");
+      this.numOfPartitionFields = conf.getInt("num.key.fields.for.partition",0);
+      keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
+    } else {
+      String option = conf.get(PARTITIONER_OPTIONS);
+      keyFieldHelper.parseOption(option);
+    }
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  public int getPartition(K2 key, V2 value, int numReduceTasks) {
+    byte[] keyBytes;
+
+    List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
+    if (allKeySpecs.size() == 0) {
+      return getPartition(key.toString().hashCode(), numReduceTasks);
+    }
+
+    try {
+      keyBytes = key.toString().getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("The current system does not " +
+          "support UTF-8 encoding!", e);
+    }
+    // return 0 if the key is empty
+    if (keyBytes.length == 0) {
+      return 0;
+    }
+    
+    int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0, 
+        keyBytes.length);
+    int currentHash = 0;
+    for (KeyDescription keySpec : allKeySpecs) {
+      int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, 
+        keyBytes.length, lengthIndicesFirst, keySpec);
+       // no key found! continue
+      if (startChar < 0) {
+        continue;
+      }
+      int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length, 
+          lengthIndicesFirst, keySpec);
+      currentHash = hashCode(keyBytes, startChar, endChar, 
+          currentHash);
+    }
+    return getPartition(currentHash, numReduceTasks);
+  }
+  
+  protected int hashCode(byte[] b, int start, int end, int currentHash) {
+    for (int i = start; i <= end; i++) {
+      currentHash = 31*currentHash + b[i];
+    }
+    return currentHash;
+  }
+
+  protected int getPartition(int hash, int numReduceTasks) {
+    return (hash & Integer.MAX_VALUE) % numReduceTasks;
+  }
+  
+  /**
+   * Set the {@link KeyFieldBasedPartitioner} options used for 
+   * {@link Partitioner}
+   * 
+   * @param keySpec the key specification of the form -k pos1[,pos2], where,
+   *  pos is of the form f[.c][opts], where f is the number
+   *  of the key field to use, and c is the number of the first character from
+   *  the beginning of the field. Fields and character posns are numbered 
+   *  starting with 1; a character position of zero in pos2 indicates the
+   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
+   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
+   *  (the end of the field).
+   */
+  public void setKeyFieldPartitionerOptions(Job job, String keySpec) {
+    job.getConfiguration().set(PARTITIONER_OPTIONS, keySpec);
+  }
+  
+  /**
+   * Get the {@link KeyFieldBasedPartitioner} options
+   */
+  public String getKeyFieldPartitionerOption(JobContext job) {
+    return job.getConfiguration().get(PARTITIONER_OPTIONS);
+  }
+
+
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldHelper.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldHelper.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldHelper.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.util.UTF8ByteArrayUtils;
+
+/**
+ * This is used in {@link KeyFieldBasedComparator} & 
+ * {@link KeyFieldBasedPartitioner}. Defines all the methods
+ * for parsing key specifications. The key specification is of the form:
+ * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number
+ *  of the field to use, and c is the number of the first character from the
+ *  beginning of the field. Fields and character posns are numbered starting
+ *  with 1; a character position of zero in pos2 indicates the field's last
+ *  character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
+ *  of the field); if omitted from pos2, it defaults to 0 (the end of the
+ *  field). opts are ordering options (supported options are 'nr'). 
+ */
+
+class KeyFieldHelper {
+  
+  protected static class KeyDescription {
+    int beginFieldIdx = 1;
+    int beginChar = 1;
+    int endFieldIdx = 0;
+    int endChar = 0;
+    boolean numeric;
+    boolean reverse;
+    @Override
+    public String toString() {
+      return "-k" 
+             + beginFieldIdx + "." + beginChar + "," 
+             + endFieldIdx + "." + endChar 
+             + (numeric ? "n" : "") + (reverse ? "r" : "");
+    }
+  }
+  
+  private List<KeyDescription> allKeySpecs = new ArrayList<KeyDescription>();
+  private byte[] keyFieldSeparator;
+  private boolean keySpecSeen = false;
+  
+  public void setKeyFieldSeparator(String keyFieldSeparator) {
+    try {
+      this.keyFieldSeparator =
+        keyFieldSeparator.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("The current system does not " +
+          "support UTF-8 encoding!", e);
+    }    
+  }
+  
+  /** Required for backcompatibility with num.key.fields.for.partition in
+   * {@link KeyFieldBasedPartitioner} */
+  public void setKeyFieldSpec(int start, int end) {
+    if (end >= start) {
+      KeyDescription k = new KeyDescription();
+      k.beginFieldIdx = start;
+      k.endFieldIdx = end;
+      keySpecSeen = true;
+      allKeySpecs.add(k);
+    }
+  }
+  
+  public List<KeyDescription> keySpecs() {
+    return allKeySpecs;
+  }
+    
+  public int[] getWordLengths(byte []b, int start, int end) {
+    //Given a string like "hello how are you", it returns an array
+    //like [4 5, 3, 3, 3], where the first element is the number of
+	//fields
+    if (!keySpecSeen) {
+      //if there were no key specs, then the whole key is one word
+      return new int[] {1};
+    }
+    int[] lengths = new int[10];
+    int currLenLengths = lengths.length;
+    int idx = 1;
+    int pos;
+    while ((pos = UTF8ByteArrayUtils.findBytes(b, start, end, 
+        keyFieldSeparator)) != -1) {
+      if (++idx == currLenLengths) {
+        int[] temp = lengths;
+        lengths = new int[(currLenLengths = currLenLengths*2)];
+        System.arraycopy(temp, 0, lengths, 0, temp.length);
+      }
+      lengths[idx - 1] = pos - start;
+      start = pos + 1;
+    }
+    
+    if (start != end) {
+      lengths[idx] = end - start;
+    }
+    lengths[0] = idx; //number of words is the first element
+    return lengths;
+  }
+  public int getStartOffset(byte[]b, int start, int end, 
+      int []lengthIndices, KeyDescription k) {
+    //if -k2.5,2 is the keyspec, the startChar is lengthIndices[1] + 5
+    //note that the [0]'th element is the number of fields in the key
+    if (lengthIndices[0] >= k.beginFieldIdx) {
+      int position = 0;
+      for (int i = 1; i < k.beginFieldIdx; i++) {
+        position += lengthIndices[i] + keyFieldSeparator.length; 
+      }
+      if (position + k.beginChar <= (end - start)) {
+        return start + position + k.beginChar - 1; 
+      }
+    }
+    return -1;
+  }
+  public int getEndOffset(byte[]b, int start, int end, 
+      int []lengthIndices, KeyDescription k) {
+    //if -k2,2.8 is the keyspec, the endChar is lengthIndices[1] + 8
+    //note that the [0]'th element is the number of fields in the key
+    if (k.endFieldIdx == 0) {
+      //there is no end field specified for this keyspec. So the remaining
+      //part of the key is considered in its entirety.
+      return end - 1; 
+    }
+    if (lengthIndices[0] >= k.endFieldIdx) {
+      int position = 0;
+      int i;
+      for (i = 1; i < k.endFieldIdx; i++) {
+        position += lengthIndices[i] + keyFieldSeparator.length;
+      }
+      if (k.endChar == 0) { 
+        position += lengthIndices[i];
+      }
+      if (position + k.endChar <= (end - start)) {
+        return start + position + k.endChar - 1;
+      }
+      return end - 1;
+    }
+    return end - 1;
+  }
+  public void parseOption(String option) {
+    if (option == null || option.equals("")) {
+      //we will have only default comparison
+      return;
+    }
+    StringTokenizer args = new StringTokenizer(option);
+    KeyDescription global = new KeyDescription();
+    while (args.hasMoreTokens()) {
+      String arg = args.nextToken();
+      if (arg.equals("-n")) {  
+        global.numeric = true;
+      }
+      if (arg.equals("-r")) {
+        global.reverse = true;
+      }
+      if (arg.equals("-nr")) {
+        global.numeric = true;
+        global.reverse = true;
+      }
+      if (arg.startsWith("-k")) {
+        KeyDescription k = parseKey(arg, args);
+        if (k != null) {
+          allKeySpecs.add(k);
+          keySpecSeen = true;
+        }
+      }
+    }
+    for (KeyDescription key : allKeySpecs) {
+      if (!(key.reverse | key.numeric)) {
+        key.reverse = global.reverse;
+        key.numeric = global.numeric;
+      }
+    }
+    if (allKeySpecs.size() == 0) {
+      allKeySpecs.add(global);
+    }
+  }
+  
+  private KeyDescription parseKey(String arg, StringTokenizer args) {
+    //we allow for -k<arg> and -k <arg>
+    String keyArgs = null;
+    if (arg.length() == 2) {
+      if (args.hasMoreTokens()) {
+        keyArgs = args.nextToken();
+      }
+    } else {
+      keyArgs = arg.substring(2);
+    }
+    if (keyArgs == null || keyArgs.length() == 0) {
+      return null;
+    }
+    StringTokenizer st = new StringTokenizer(keyArgs,"nr.,",true);
+       
+    KeyDescription key = new KeyDescription();
+    
+    String token;
+    //the key is of the form 1[.3][nr][,1.5][nr]
+    if (st.hasMoreTokens()) {
+      token = st.nextToken();
+      //the first token must be a number
+      key.beginFieldIdx = Integer.parseInt(token);
+    }
+    if (st.hasMoreTokens()) {
+      token = st.nextToken();
+      if (token.equals(".")) {
+        token = st.nextToken();
+        key.beginChar = Integer.parseInt(token);
+        if (st.hasMoreTokens()) {
+          token = st.nextToken();
+        } else {
+          return key;
+        }
+      } 
+      do {
+        if (token.equals("n")) {
+          key.numeric = true;
+        }
+        else if (token.equals("r")) {
+          key.reverse = true;
+        }
+        else break;
+        if (st.hasMoreTokens()) {
+          token = st.nextToken();
+        } else {
+          return key;
+        }
+      } while (true);
+      if (token.equals(",")) {
+        token = st.nextToken();
+        //the first token must be a number
+        key.endFieldIdx = Integer.parseInt(token);
+        if (st.hasMoreTokens()) {
+          token = st.nextToken();
+          if (token.equals(".")) {
+            token = st.nextToken();
+            key.endChar = Integer.parseInt(token);
+            if (st.hasMoreTokens()) {
+              token = st.nextToken();
+            } else {
+              return key;
+            }
+          }
+          do {
+            if (token.equals("n")) {
+              key.numeric = true;
+            }
+            else if (token.equals("r")) {
+              key.reverse = true;
+            }
+            else { 
+              throw new IllegalArgumentException("Invalid -k argument. " +
+               "Must be of the form -k pos1,[pos2], where pos is of the form " +
+               "f[.c]nr");
+            }
+            if (st.hasMoreTokens()) {
+              token = st.nextToken();
+            } else {
+              break;
+            }
+          } while (true);
+        }
+        return key;
+      }
+      throw new IllegalArgumentException("Invalid -k argument. " +
+          "Must be of the form -k pos1,[pos2], where pos is of the form " +
+          "f[.c]nr");
+    }
+    return key;
+  }
+  private void printKey(KeyDescription key) {
+    System.out.println("key.beginFieldIdx: " + key.beginFieldIdx);
+    System.out.println("key.beginChar: " + key.beginChar);
+    System.out.println("key.endFieldIdx: " + key.endFieldIdx);
+    System.out.println("key.endChar: " + key.endChar);
+    System.out.println("key.numeric: " + key.numeric);
+    System.out.println("key.reverse: " + key.reverse);
+    System.out.println("parseKey over");
+  }  
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,411 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Partitioner effecting a total order by reading split points from
+ * an externally generated source.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
+    extends Partitioner<K,V> implements Configurable {
+
+  private Node partitions;
+  public static final String DEFAULT_PATH = "_partition.lst";
+  public static final String PARTITIONER_PATH = 
+    "mapreduce.totalorderpartitioner.path";
+  public static final String MAX_TRIE_DEPTH = 
+    "mapreduce.totalorderpartitioner.trie.maxdepth"; 
+  public static final String NATURAL_ORDER = 
+    "mapreduce.totalorderpartitioner.naturalorder";
+  Configuration conf;
+  private static final Log LOG = LogFactory.getLog(TotalOrderPartitioner.class);
+
+  public TotalOrderPartitioner() { }
+
+  /**
+   * Read in the partition file and build indexing data structures.
+   * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
+   * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
+   * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
+   * will be built. Otherwise, keys will be located using a binary search of
+   * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
+   * defined for this job. The input file must be sorted with the same
+   * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
+   */
+  @SuppressWarnings("unchecked") // keytype from conf not static
+  public void setConf(Configuration conf) {
+    try {
+      this.conf = conf;
+      String parts = getPartitionFile(conf);
+      final Path partFile = new Path(parts);
+      final FileSystem fs = (DEFAULT_PATH.equals(parts))
+        ? FileSystem.getLocal(conf)     // assume in DistributedCache
+        : partFile.getFileSystem(conf);
+
+      Job job = new Job(conf);
+      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+      K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
+      if (splitPoints.length != job.getNumReduceTasks() - 1) {
+        throw new IOException("Wrong number of partitions in keyset");
+      }
+      RawComparator<K> comparator =
+        (RawComparator<K>) job.getSortComparator();
+      for (int i = 0; i < splitPoints.length - 1; ++i) {
+        if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
+          throw new IOException("Split points are out of order");
+        }
+      }
+      boolean natOrder =
+        conf.getBoolean(NATURAL_ORDER, true);
+      if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
+        partitions = buildTrie((BinaryComparable[])splitPoints, 0,
+            splitPoints.length, new byte[0],
+            // Now that blocks of identical splitless trie nodes are 
+            // represented reentrantly, and we develop a leaf for any trie
+            // node with only one split point, the only reason for a depth
+            // limit is to refute stack overflow or bloat in the pathological
+            // case where the split points are long and mostly look like bytes 
+            // iii...iixii...iii   .  Therefore, we make the default depth
+            // limit large but not huge.
+            conf.getInt(MAX_TRIE_DEPTH, 200));
+      } else {
+        partitions = new BinarySearchNode(splitPoints, comparator);
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Can't read partitions file", e);
+    }
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  // by construction, we know if our keytype
+  @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
+  public int getPartition(K key, V value, int numPartitions) {
+    return partitions.findPartition(key);
+  }
+
+  /**
+   * Set the path to the SequenceFile storing the sorted partition keyset.
+   * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
+   * keys in the SequenceFile.
+   */
+  public static void setPartitionFile(Configuration conf, Path p) {
+    conf.set(PARTITIONER_PATH, p.toString());
+  }
+
+  /**
+   * Get the path to the SequenceFile storing the sorted partition keyset.
+   * @see #setPartitionFile(Configuration, Path)
+   */
+  public static String getPartitionFile(Configuration conf) {
+    return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
+  }
+
+  /**
+   * Interface to the partitioner to locate a key in the partition keyset.
+   */
+  interface Node<T> {
+    /**
+     * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
+     * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
+     */
+    int findPartition(T key);
+  }
+
+  /**
+   * Base class for trie nodes. If the keytype is memcomp-able, this builds
+   * tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
+   * bytes.
+   */
+  static abstract class TrieNode implements Node<BinaryComparable> {
+    private final int level;
+    TrieNode(int level) {
+      this.level = level;
+    }
+    int getLevel() {
+      return level;
+    }
+  }
+
+  /**
+   * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
+   * where disabled by <tt>total.order.partitioner.natural.order</tt>,
+   * search the partition keyset with a binary search.
+   */
+  class BinarySearchNode implements Node<K> {
+    private final K[] splitPoints;
+    private final RawComparator<K> comparator;
+    BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
+      this.splitPoints = splitPoints;
+      this.comparator = comparator;
+    }
+    public int findPartition(K key) {
+      final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
+      return (pos < 0) ? -pos : pos;
+    }
+  }
+
+  /**
+   * An inner trie node that contains 256 children based on the next
+   * character.
+   */
+  class InnerTrieNode extends TrieNode {
+    private TrieNode[] child = new TrieNode[256];
+
+    InnerTrieNode(int level) {
+      super(level);
+    }
+    public int findPartition(BinaryComparable key) {
+      int level = getLevel();
+      if (key.getLength() <= level) {
+        return child[0].findPartition(key);
+      }
+      return child[0xFF & key.getBytes()[level]].findPartition(key);
+    }
+  }
+  
+  /**
+   * @param level        the tree depth at this node
+   * @param splitPoints  the full split point vector, which holds
+   *                     the split point or points this leaf node
+   *                     should contain
+   * @param lower        first INcluded element of splitPoints
+   * @param upper        first EXcluded element of splitPoints
+   * @return  a leaf node.  They come in three kinds: no split points 
+   *          [and the findParttion returns a canned index], one split
+   *          point [and we compare with a single comparand], or more
+   *          than one [and we do a binary search].  The last case is
+   *          rare.
+   */
+  private TrieNode LeafTrieNodeFactory
+             (int level, BinaryComparable[] splitPoints, int lower, int upper) {
+      switch (upper - lower) {
+      case 0:
+          return new UnsplitTrieNode(level, lower);
+          
+      case 1:
+          return new SinglySplitTrieNode(level, splitPoints, lower);
+          
+      default:
+          return new LeafTrieNode(level, splitPoints, lower, upper);
+      }
+  }
+
+  /**
+   * A leaf trie node that scans for the key between lower..upper.
+   * 
+   * We don't generate many of these now, since we usually continue trie-ing 
+   * when more than one split point remains at this level. and we make different
+   * objects for nodes with 0 or 1 split point.
+   */
+  private class LeafTrieNode extends TrieNode {
+    final int lower;
+    final int upper;
+    final BinaryComparable[] splitPoints;
+    LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
+      super(level);
+      this.lower = lower;
+      this.upper = upper;
+      this.splitPoints = splitPoints;
+    }
+    public int findPartition(BinaryComparable key) {
+      final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
+      return (pos < 0) ? -pos : pos;
+    }
+  }
+  
+  private class UnsplitTrieNode extends TrieNode {
+      final int result;
+      
+      UnsplitTrieNode(int level, int value) {
+          super(level);
+          this.result = value;
+      }
+      
+      public int findPartition(BinaryComparable key) {
+          return result;
+      }
+  }
+  
+  private class SinglySplitTrieNode extends TrieNode {
+      final int               lower;
+      final BinaryComparable  mySplitPoint;
+      
+      SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
+          super(level);
+          this.lower = lower;
+          this.mySplitPoint = splitPoints[lower];
+      }
+      
+      public int findPartition(BinaryComparable key) {
+          return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
+      }
+  }
+
+
+  /**
+   * Read the cut points from the given IFile.
+   * @param fs The file system
+   * @param p The path to read
+   * @param keyClass The map output key class
+   * @param job The job config
+   * @throws IOException
+   */
+                                 // matching key types enforced by passing in
+  @SuppressWarnings("unchecked") // map output key class
+  private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
+      Configuration conf) throws IOException {
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
+    ArrayList<K> parts = new ArrayList<K>();
+    K key = ReflectionUtils.newInstance(keyClass, conf);
+    NullWritable value = NullWritable.get();
+    try {
+      while (reader.next(key, value)) {
+        parts.add(key);
+        key = ReflectionUtils.newInstance(keyClass, conf);
+      }
+      reader.close();
+      reader = null;
+    } finally {
+      IOUtils.cleanup(LOG, reader);
+    }
+    return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
+  }
+  
+  /**
+   * 
+   * This object contains a TrieNodeRef if there is such a thing that
+   * can be repeated.  Two adjacent trie node slots that contain no 
+   * split points can be filled with the same trie node, even if they
+   * are not on the same level.  See buildTreeRec, below.
+   *
+   */  
+  private class CarriedTrieNodeRef
+  {
+      TrieNode   content;
+      
+      CarriedTrieNodeRef() {
+          content = null;
+      }
+  }
+
+  
+  /**
+   * Given a sorted set of cut points, build a trie that will find the correct
+   * partition quickly.
+   * @param splits the list of cut points
+   * @param lower the lower bound of partitions 0..numPartitions-1
+   * @param upper the upper bound of partitions 0..numPartitions-1
+   * @param prefix the prefix that we have already checked against
+   * @param maxDepth the maximum depth we will build a trie for
+   * @return the trie node that will divide the splits correctly
+   */
+  private TrieNode buildTrie(BinaryComparable[] splits, int lower,
+          int upper, byte[] prefix, int maxDepth) {
+      return buildTrieRec
+               (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
+  }
+  
+  /**
+   * This is the core of buildTrie.  The interface, and stub, above, just adds
+   * an empty CarriedTrieNodeRef.  
+   * 
+   * We build trie nodes in depth first order, which is also in key space
+   * order.  Every leaf node is referenced as a slot in a parent internal
+   * node.  If two adjacent slots [in the DFO] hold leaf nodes that have
+   * no split point, then they are not separated by a split point either, 
+   * because there's no place in key space for that split point to exist.
+   * 
+   * When that happens, the leaf nodes would be semantically identical, and
+   * we reuse the object.  A single CarriedTrieNodeRef "ref" lives for the 
+   * duration of the tree-walk.  ref carries a potentially reusable, unsplit
+   * leaf node for such reuse until a leaf node with a split arises, which 
+   * breaks the chain until we need to make a new unsplit leaf node.
+   * 
+   * Note that this use of CarriedTrieNodeRef means that for internal nodes, 
+   * for internal nodes if this code is modified in any way we still need 
+   * to make or fill in the subnodes in key space order.
+   */
+  private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
+      int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
+    final int depth = prefix.length;
+    // We generate leaves for a single split point as well as for 
+    // no split points.
+    if (depth >= maxDepth || lower >= upper - 1) {
+        // If we have two consecutive requests for an unsplit trie node, we
+        // can deliver the same one the second time.
+        if (lower == upper && ref.content != null) {
+            return ref.content;
+        }
+        TrieNode  result = LeafTrieNodeFactory(depth, splits, lower, upper);
+        ref.content = lower == upper ? result : null;
+        return result;
+    }
+    InnerTrieNode result = new InnerTrieNode(depth);
+    byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
+    // append an extra byte on to the prefix
+    int         currentBound = lower;
+    for(int ch = 0; ch < 0xFF; ++ch) {
+      trial[depth] = (byte) (ch + 1);
+      lower = currentBound;
+      while (currentBound < upper) {
+        if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
+          break;
+        }
+        currentBound += 1;
+      }
+      trial[depth] = (byte) ch;
+      result.child[0xFF & ch]
+                   = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
+    }
+    // pick up the rest
+    trial[depth] = (byte)0xFF;
+    result.child[0xFF] 
+                 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
+    
+    return result;
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+/**
+ * A JUnit test to test the Map-Reduce framework's feature to create part
+ * files only if there is an explicit output.collect. This helps in preventing
+ * 0 byte files
+ */
+public class TestMapReduceLazyOutput extends TestCase {
+  private static final int NUM_HADOOP_SLAVES = 3;
+  private static final int NUM_MAPS_PER_NODE = 2;
+  private static final Path INPUT = new Path("/testlazy/input");
+
+  private static final List<String> input = 
+    Arrays.asList("All","Roads","Lead","To","Hadoop");
+
+  public static class TestMapper 
+  extends Mapper<LongWritable, Text, LongWritable, Text>{
+
+    public void map(LongWritable key, Text value, Context context
+    ) throws IOException, InterruptedException {
+      String id = context.getTaskAttemptID().toString();
+      // Mapper 0 does not output anything
+      if (!id.endsWith("0_0")) {
+        context.write(key, value);
+      }
+    }
+  }
+
+
+  public static class TestReducer 
+  extends Reducer<LongWritable,Text,LongWritable,Text> {
+    
+    public void reduce(LongWritable key, Iterable<Text> values, 
+        Context context) throws IOException, InterruptedException {
+      String id = context.getTaskAttemptID().toString();
+      // Reducer 0 does not output anything
+      if (!id.endsWith("0_0")) {
+        for (Text val: values) {
+          context.write(key, val);
+        }
+      }
+    }
+  }
+  
+  private static void runTestLazyOutput(Configuration conf, Path output,
+      int numReducers, boolean createLazily) 
+  throws Exception {
+    Job job = new Job(conf, "Test-Lazy-Output");
+
+    FileInputFormat.setInputPaths(job, INPUT);
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.setJarByClass(TestMapReduceLazyOutput.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputKeyClass(LongWritable.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumReduceTasks(numReducers);
+
+    job.setMapperClass(TestMapper.class);
+    job.setReducerClass(TestReducer.class);
+
+    if (createLazily) {
+      LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
+    } else {
+      job.setOutputFormatClass(TextOutputFormat.class);
+    }
+    assertTrue(job.waitForCompletion(true));
+  }
+
+  public void createInput(FileSystem fs, int numMappers) throws Exception {
+    for (int i =0; i < numMappers; i++) {
+      OutputStream os = fs.create(new Path(INPUT, 
+        "text" + i + ".txt"));
+      Writer wr = new OutputStreamWriter(os);
+      for(String inp : input) {
+        wr.write(inp+"\n");
+      }
+      wr.close();
+    }
+  }
+
+
+  public void testLazyOutput() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    try {
+      Configuration conf = new Configuration();
+
+      // Start the mini-MR and mini-DFS clusters
+      dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
+      fileSys = dfs.getFileSystem();
+      mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri().toString(), 1);
+
+      int numReducers = 2;
+      int numMappers = NUM_HADOOP_SLAVES * NUM_MAPS_PER_NODE;
+
+      createInput(fileSys, numMappers);
+      Path output1 = new Path("/testlazy/output1");
+
+      // Test 1. 
+      runTestLazyOutput(mr.createJobConf(), output1, 
+          numReducers, true);
+
+      Path[] fileList = 
+        FileUtil.stat2Paths(fileSys.listStatus(output1,
+            new Utils.OutputFileUtils.OutputFilesFilter()));
+      for(int i=0; i < fileList.length; ++i) {
+        System.out.println("Test1 File list[" + i + "]" + ": "+ fileList[i]);
+      }
+      assertTrue(fileList.length == (numReducers - 1));
+
+      // Test 2. 0 Reducers, maps directly write to the output files
+      Path output2 = new Path("/testlazy/output2");
+      runTestLazyOutput(mr.createJobConf(), output2, 0, true);
+
+      fileList =
+        FileUtil.stat2Paths(fileSys.listStatus(output2,
+            new Utils.OutputFileUtils.OutputFilesFilter()));
+      for(int i=0; i < fileList.length; ++i) {
+        System.out.println("Test2 File list[" + i + "]" + ": "+ fileList[i]);
+      }
+
+      assertTrue(fileList.length == numMappers - 1);
+
+      // Test 3. 0 Reducers, but flag is turned off
+      Path output3 = new Path("/testlazy/output3");
+      runTestLazyOutput(mr.createJobConf(), output3, 0, false);
+
+      fileList =
+        FileUtil.stat2Paths(fileSys.listStatus(output3,
+            new Utils.OutputFileUtils.OutputFilesFilter()));
+      for(int i=0; i < fileList.length; ++i) {
+        System.out.println("Test3 File list[" + i + "]" + ": "+ fileList[i]);
+      }
+
+      assertTrue(fileList.length == numMappers);
+
+    } finally {
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown();
+      }
+    }
+  }
+
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDBJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDBJob.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDBJob.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDBJob.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.IOException;
+
+import org.apache.hadoop.examples.DBCountPageView;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.util.ToolRunner;
+
+
+public class TestDBJob extends HadoopTestCase {
+
+  public TestDBJob() throws IOException {
+    super(LOCAL_MR, LOCAL_FS, 3, 1);
+  }
+  
+  public void testRun() throws Exception {
+    DBCountPageView testDriver = new DBCountPageView();
+    ToolRunner.run(createJobConf(), testDriver, new String[0]);
+  }
+  
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDBJob.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDBOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDBOutputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDBOutputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDBOutputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+
+public class TestDBOutputFormat extends TestCase {
+  
+  private String[] fieldNames = new String[] { "id", "name", "value" };
+  private String[] nullFieldNames = new String[] { null, null, null };
+  private String expected = "INSERT INTO hadoop_output " +
+                             "(id,name,value) VALUES (?,?,?);";
+  private String nullExpected = "INSERT INTO hadoop_output VALUES (?,?,?);"; 
+  
+  private DBOutputFormat<DBWritable, NullWritable> format 
+    = new DBOutputFormat<DBWritable, NullWritable>();
+  
+  public void testConstructQuery() {  
+    String actual = format.constructQuery("hadoop_output", fieldNames);
+    assertEquals(expected, actual);
+    
+    actual = format.constructQuery("hadoop_output", nullFieldNames);
+    assertEquals(nullExpected, actual);
+  }
+  
+  public void testSetOutput() throws IOException {
+    Job job = new Job(new Configuration());
+    DBOutputFormat.setOutput(job, "hadoop_output", fieldNames);
+    
+    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
+    String actual = format.constructQuery(dbConf.getOutputTableName()
+        , dbConf.getOutputFieldNames());
+    
+    assertEquals(expected, actual);
+    
+    job = new Job(new Configuration());
+    dbConf = new DBConfiguration(job.getConfiguration());
+    DBOutputFormat.setOutput(job, "hadoop_output", nullFieldNames.length);
+    assertNull(dbConf.getOutputFieldNames());
+    assertEquals(nullFieldNames.length, dbConf.getOutputFieldCount());
+    
+    actual = format.constructQuery(dbConf.getOutputTableName()
+        , new String[dbConf.getOutputFieldCount()]);
+    
+    assertEquals(nullExpected, actual);
+  }
+  
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDBOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDataDrivenDBInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDataDrivenDBInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDataDrivenDBInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.sql.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.examples.DBCountPageView;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.db.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.hadoop.util.StringUtils;
+import org.hsqldb.Server;
+
+/**
+ * Test aspects of DataDrivenDBInputFormat
+ */
+public class TestDataDrivenDBInputFormat extends HadoopTestCase {
+
+  private static final Log LOG = LogFactory.getLog(
+      TestDataDrivenDBInputFormat.class);
+
+  private static final String DB_NAME = "dddbif";
+  private static final String DB_URL = 
+    "jdbc:hsqldb:hsql://localhost/" + DB_NAME;
+  private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
+
+  private Server server;
+  private Connection connection;
+
+  private static final String OUT_DIR;
+
+  public TestDataDrivenDBInputFormat() throws IOException {
+    super(LOCAL_MR, LOCAL_FS, 1, 1);
+  }
+
+  static {
+    OUT_DIR = System.getProperty("test.build.data", "/tmp") + "/dddbifout";
+  }
+
+  private void startHsqldbServer() {
+    if (null == server) {
+      server = new Server();
+      server.setDatabasePath(0,
+          System.getProperty("test.build.data", "/tmp") + "/" + DB_NAME);
+      server.setDatabaseName(0, DB_NAME);
+      server.start();
+    }
+  }
+
+  private void createConnection(String driverClassName,
+      String url) throws Exception {
+
+    Class.forName(driverClassName);
+    connection = DriverManager.getConnection(url);
+    connection.setAutoCommit(false);
+  }
+
+  private void shutdown() {
+    try {
+      connection.commit();
+      connection.close();
+      connection = null;
+    }catch (Throwable ex) {
+      LOG.warn("Exception occurred while closing connection :"
+          + StringUtils.stringifyException(ex));
+    } finally {
+      try {
+        if(server != null) {
+          server.shutdown();
+        }
+      }catch (Throwable ex) {
+        LOG.warn("Exception occurred while shutting down HSQLDB :"
+            + StringUtils.stringifyException(ex));
+      }
+      server = null;
+    }
+  }
+
+  private void initialize(String driverClassName, String url)
+      throws Exception {
+    startHsqldbServer();
+    createConnection(driverClassName, url);
+  }
+
+  public void setUp() throws Exception {
+    initialize(DRIVER_CLASS, DB_URL);
+    super.setUp();
+  }
+
+  public void tearDown() throws Exception {
+    super.tearDown();
+    shutdown();
+  }
+
+
+
+  public static class DateCol implements DBWritable, WritableComparable {
+    Date d;
+
+    public String toString() {
+      return d.toString();
+    }
+
+    public void readFields(ResultSet rs) throws SQLException {
+      d = rs.getDate(1);
+    }
+
+    public void write(PreparedStatement ps) {
+      // not needed.
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      long v = in.readLong();
+      d = new Date(v);
+    }
+
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(d.getTime());
+    }
+
+    @Override
+    public int hashCode() {
+      return (int) d.getTime();
+    }
+
+    @Override
+    public int compareTo(Object o) {
+      if (o instanceof DateCol) {
+        Long v = Long.valueOf(d.getTime());
+        Long other = Long.valueOf(((DateCol) o).d.getTime());
+        return v.compareTo(other);
+      } else {
+        return -1;
+      }
+    }
+  }
+
+  public static class ValMapper
+      extends Mapper<Object, Object, Object, NullWritable> {
+    public void map(Object k, Object v, Context c)
+        throws IOException, InterruptedException {
+      c.write(v, NullWritable.get());
+    }
+  }
+
+  public void testDateSplits() throws Exception {
+    Statement s = connection.createStatement();
+    final String DATE_TABLE = "datetable";
+    final String COL = "foo";
+    try {
+      // delete the table if it already exists.
+      s.executeUpdate("DROP TABLE " + DATE_TABLE);
+    } catch (SQLException e) {
+    }
+
+    // Create the table.
+    s.executeUpdate("CREATE TABLE " + DATE_TABLE + "(" + COL + " TIMESTAMP)");
+    s.executeUpdate("INSERT INTO " + DATE_TABLE + " VALUES('2010-04-01')");
+    s.executeUpdate("INSERT INTO " + DATE_TABLE + " VALUES('2010-04-02')");
+    s.executeUpdate("INSERT INTO " + DATE_TABLE + " VALUES('2010-05-01')");
+    s.executeUpdate("INSERT INTO " + DATE_TABLE + " VALUES('2011-04-01')");
+
+    // commit this tx.
+    connection.commit();
+
+    Configuration conf = new Configuration();
+    conf.set("fs.defaultFS", "file:///");
+    FileSystem fs = FileSystem.getLocal(conf);
+    fs.delete(new Path(OUT_DIR), true);
+
+    // now do a dd import
+    Job job = new Job(conf);
+    job.setMapperClass(ValMapper.class);
+    job.setReducerClass(Reducer.class);
+    job.setMapOutputKeyClass(DateCol.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setOutputKeyClass(DateCol.class);
+    job.setOutputValueClass(NullWritable.class);
+    job.setNumReduceTasks(1);
+    job.getConfiguration().setInt("mapreduce.map.tasks", 2);
+    FileOutputFormat.setOutputPath(job, new Path(OUT_DIR));
+    DBConfiguration.configureDB(job.getConfiguration(), DRIVER_CLASS,
+        DB_URL, null, null);
+    DataDrivenDBInputFormat.setInput(job, DateCol.class, DATE_TABLE, null,
+        COL, COL);
+
+    boolean ret = job.waitForCompletion(true);
+    assertTrue("job failed", ret);
+
+    // Check to see that we imported as much as we thought we did.
+    assertEquals("Did not get all the records", 4,
+        job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter",
+          "REDUCE_OUTPUT_RECORDS").getValue());
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestDataDrivenDBInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message